Merge pull request #91761 from MikeSpreitzer/test-uneven-fairness

Add test for windup problem in APF
This commit is contained in:
Kubernetes Prow Robot 2020-07-15 06:04:37 -07:00 committed by GitHub
commit 30b0ebd6d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 453 additions and 141 deletions

View File

@ -441,7 +441,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
bestQueueIdx, bestQueueLen = queueIdx, thisLen bestQueueIdx, bestQueueLen = queueIdx, thisLen
} }
}) })
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
return bestQueueIdx return bestQueueIdx
} }

View File

@ -20,6 +20,8 @@ import (
"context" "context"
"fmt" "fmt"
"math" "math"
"reflect"
"sort"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -32,7 +34,60 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
type uniformScenario []uniformClient // fairAlloc computes the max-min fair allocation of the given
// capacity to the given demands (which slice is not side-effected).
func fairAlloc(demands []float64, capacity float64) []float64 {
count := len(demands)
indices := make([]int, count)
for i := 0; i < count; i++ {
indices[i] = i
}
sort.Slice(indices, func(i, j int) bool { return demands[indices[i]] < demands[indices[j]] })
alloc := make([]float64, count)
var next int
var prevAlloc float64
for ; next < count; next++ {
// `capacity` is how much remains assuming that
// all unvisited items get `prevAlloc`.
idx := indices[next]
demand := demands[idx]
if demand <= 0 {
continue
}
// `fullCapacityBite` is how much more capacity would be used
// if this and all following items get as much as this one
// is demanding.
fullCapacityBite := float64(count-next) * (demand - prevAlloc)
if fullCapacityBite > capacity {
break
}
prevAlloc = demand
alloc[idx] = demand
capacity -= fullCapacityBite
}
for j := next; j < count; j++ {
alloc[indices[j]] = prevAlloc + capacity/float64(count-next)
}
return alloc
}
func TestFairAlloc(t *testing.T) {
if e, a := []float64{0, 0}, fairAlloc([]float64{0, 0}, 42); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{42, 0}, fairAlloc([]float64{47, 0}, 42); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{1, 41}, fairAlloc([]float64{1, 47}, 42); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{3, 5, 5, 1}, fairAlloc([]float64{3, 7, 9, 1}, 14); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
if e, a := []float64{1, 9, 7, 3}, fairAlloc([]float64{1, 9, 7, 3}, 21); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#+v, got #%+v", e, a)
}
}
type uniformClient struct { type uniformClient struct {
hash uint64 hash uint64
@ -40,150 +95,246 @@ type uniformClient struct {
nCalls int nCalls int
// duration for a simulated synchronous call // duration for a simulated synchronous call
execDuration time.Duration execDuration time.Duration
// duration for simulated "other work" // duration for simulated "other work". This can be negative,
// causing a request to be launched a certain amount of time
// before the previous one finishes.
thinkDuration time.Duration thinkDuration time.Duration
// When true indicates that only half the specified number of
// threads should run during the first half of the evaluation
// period
split bool
} }
// exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients. // uniformScenario describes a scenario based on the given set of uniform clients.
// Each uniform client specifies a number of threads, each of which alternates between thinking // Each uniform client specifies a number of threads, each of which alternates between thinking
// and making a synchronous request through the QueueSet. // and making a synchronous request through the QueueSet.
// This function measures how much concurrency each client got, on average, over // The test measures how much concurrency each client got, on average, over
// the initial evalDuration and tests to see whether they all got about the same amount. // the initial evalDuration and tests to see whether they all got about the fair amount.
// Each client needs to be demanding enough to use this amount, otherwise the fair result // Each client needs to be demanding enough to use more than its fair share,
// is not equal amounts and the simple test in this function would not accurately test fairness. // or overall care needs to be taken about timing so that scheduling details
// expectPass indicates whether the QueueSet is expected to be fair. // do not cause any client to actually request a significantly smaller share
// expectedAllRequests indicates whether all requests are expected to get dispatched. // than it theoretically should.
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, // expectFair indicate whether the QueueSet is expected to be
evalDuration time.Duration, // fair in the respective halves of a split scenario;
expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool, // in a non-split scenario this is a singleton with one expectation.
rejectReason string, // expectAllRequests indicates whether all requests are expected to get dispatched.
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { type uniformScenario struct {
name string
qs fq.QueueSet
clients []uniformClient
concurrencyLimit int
evalDuration time.Duration
expectFair []bool
expectAllRequests bool
evalInqueueMetrics, evalExecutingMetrics bool
rejectReason string
clk *clock.FakeEventClock
counter counter.GoRoutineCounter
}
now := time.Now() func (us uniformScenario) exercise(t *testing.T) {
t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter) uss := uniformScenarioState{
integrators := make([]test.Integrator, len(sc)) t: t,
var failedCount uint64 uniformScenario: us,
expectedInqueue := "" startTime: time.Now(),
expectedExecuting := "" integrators: make([]test.Integrator, len(us.clients)),
if expectInqueueMetrics || expectExecutingMetrics { executions: make([]int32, len(us.clients)),
rejects: make([]int32, len(us.clients)),
}
for _, uc := range us.clients {
uss.doSplit = uss.doSplit || uc.split
}
uss.exercise()
}
type uniformScenarioState struct {
t *testing.T
uniformScenario
startTime time.Time
doSplit bool
integrators []test.Integrator
failedCount uint64
expectedInqueue, expectedExecuting string
executions, rejects []int32
}
func (uss *uniformScenarioState) exercise() {
uss.t.Logf("%s: Start %s, doSplit=%v, clk=%p, grc=%p", uss.startTime.Format(nsTimeFmt), uss.name, uss.doSplit, uss.clk, uss.counter)
if uss.evalInqueueMetrics || uss.evalExecutingMetrics {
metrics.Reset() metrics.Reset()
} }
executions := make([]int32, len(sc)) for i, uc := range uss.clients {
rejects := make([]int32, len(sc)) uss.integrators[i] = test.NewIntegrator(uss.clk)
for i, uc := range sc {
integrators[i] = test.NewIntegrator(clk)
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
expectedInqueue = expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n")
for j := 0; j < uc.nThreads; j++ { for j := 0; j < uc.nThreads; j++ {
counter.Add(1) ust := uniformScenarioThread{
go func(i, j int, uc uniformClient, igr test.Integrator) { uss: uss,
for k := 0; k < uc.nCalls; k++ { i: i,
ClockWait(clk, counter, uc.thinkDuration) j: j,
req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k}) nCalls: uc.nCalls,
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) uc: uc,
if req == nil { igr: uss.integrators[i],
atomic.AddUint64(&failedCount, 1) fsName: fsName,
atomic.AddInt32(&rejects[i], 1) }
break ust.start()
}
if idle {
t.Error("got request but QueueSet reported idle")
}
var executed bool
idle2 := req.Finish(func() {
executed = true
t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k)
atomic.AddInt32(&executions[i], 1)
igr.Add(1)
ClockWait(clk, counter, uc.execDuration)
igr.Add(-1)
})
t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2)
if !executed {
atomic.AddUint64(&failedCount, 1)
atomic.AddInt32(&rejects[i], 1)
}
}
counter.Add(-1)
}(i, j, uc, integrators[i])
} }
} }
lim := now.Add(evalDuration) if uss.doSplit {
clk.Run(&lim) uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectFair[0])
clk.SetTime(lim)
t.Logf("%s: End", clk.Now().Format(nsTimeFmt))
results := make([]test.IntegratorResults, len(sc))
var sumOfAvg float64
for i := range sc {
results[i] = integrators[i].GetResults()
sumOfAvg += results[i].Average
}
idealAverage := sumOfAvg / float64(len(sc))
passes := make([]bool, len(sc))
allPass := true
for i := range sc {
relDiff := (results[i].Average - idealAverage) / idealAverage
passes[i] = math.Abs(relDiff) <= 0.1
allPass = allPass && passes[i]
}
for i := range sc {
if allPass != expectPass {
t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage)
} else {
t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage)
}
} }
uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectFair[len(uss.expectFair)-1])
uss.clk.Run(nil)
uss.finalReview()
}
clk.Run(nil) type uniformScenarioThread struct {
if expectedAllRequests && failedCount > 0 { uss *uniformScenarioState
t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount) i, j int
} else if !expectedAllRequests && failedCount == 0 { nCalls int
t.Errorf("Expected failed requests but all requests succeeded") uc uniformClient
igr test.Integrator
fsName string
}
func (ust *uniformScenarioThread) start() {
initialDelay := time.Duration(11*ust.j + 2*ust.i)
if ust.uc.split && ust.j >= ust.uc.nThreads/2 {
initialDelay += ust.uss.evalDuration / 2
ust.nCalls = ust.nCalls / 2
} }
if expectInqueueMetrics { ust.uss.clk.EventAfterDuration(ust.genCallK(0), initialDelay)
}
// generates an EventFunc that forks a goroutine to do call k
func (ust *uniformScenarioThread) genCallK(k int) func(time.Time) {
return func(time.Time) {
// As an EventFunc, this has to return without waiting
// for time to pass, and so cannot do callK(k) itself.
ust.uss.counter.Add(1)
go func() {
ust.callK(k)
ust.uss.counter.Add(-1)
}()
}
}
func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls {
return
}
req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k})
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
return
}
if idle {
ust.uss.t.Error("got request but QueueSet reported idle")
}
var executed bool
idle2 := req.Finish(func() {
executed = true
ust.uss.t.Logf("%s: %d, %d, %d executing", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k)
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
ust.igr.Add(1)
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration)
ust.igr.Add(-1)
})
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
if !executed {
atomic.AddUint64(&ust.uss.failedCount, 1)
atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
}
}
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) {
uss.clk.Run(&lim)
uss.clk.SetTime(lim)
if uss.doSplit && !last {
uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt))
} else {
uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt))
}
demands := make([]float64, len(uss.clients))
averages := make([]float64, len(uss.clients))
for i, uc := range uss.clients {
nThreads := uc.nThreads
if uc.split && !last {
nThreads = nThreads / 2
}
demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration)
averages[i] = uss.integrators[i].Reset().Average
}
fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
for i := range uss.clients {
var gotFair bool
if fairAverages[i] > 0 {
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
gotFair = math.Abs(relDiff) <= 0.1
} else {
gotFair = math.Abs(averages[i]) <= 0.1
}
if gotFair != expectFair {
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
} else {
uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
}
}
}
func (uss *uniformScenarioState) finalReview() {
if uss.expectAllRequests && uss.failedCount > 0 {
uss.t.Errorf("Expected all requests to be successful but got %v failed requests", uss.failedCount)
} else if !uss.expectAllRequests && uss.failedCount == 0 {
uss.t.Errorf("Expected failed requests but all requests succeeded")
}
if uss.evalInqueueMetrics {
e := ` e := `
# HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge # TYPE apiserver_flowcontrol_current_inqueue_requests gauge
` + expectedInqueue ` + uss.expectedInqueue
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
if err != nil { if err != nil {
t.Error(err) uss.t.Error(err)
} else { } else {
t.Log("Success with" + e) uss.t.Log("Success with" + e)
} }
} }
expectedRejects := "" expectedRejects := ""
for i := range sc { for i := range uss.clients {
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
if atomic.AddInt32(&executions[i], 0) > 0 { if atomic.AddInt32(&uss.executions[i], 0) > 0 {
expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n")
} }
if atomic.AddInt32(&rejects[i], 0) > 0 { if atomic.AddInt32(&uss.rejects[i], 0) > 0 {
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n") expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n")
} }
} }
if expectExecutingMetrics && len(expectedExecuting) > 0 { if uss.evalExecutingMetrics && len(uss.expectedExecuting) > 0 {
e := ` e := `
# HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system
# TYPE apiserver_flowcontrol_current_executing_requests gauge # TYPE apiserver_flowcontrol_current_executing_requests gauge
` + expectedExecuting ` + uss.expectedExecuting
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests") err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests")
if err != nil { if err != nil {
t.Error(err) uss.t.Error(err)
} else { } else {
t.Log("Success with" + e) uss.t.Log("Success with" + e)
} }
} }
if expectExecutingMetrics && len(expectedRejects) > 0 { if uss.evalExecutingMetrics && len(expectedRejects) > 0 {
e := ` e := `
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
# TYPE apiserver_flowcontrol_rejected_requests_total counter # TYPE apiserver_flowcontrol_rejected_requests_total counter
` + expectedRejects ` + expectedRejects
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total") err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total")
if err != nil { if err != nil {
t.Error(err) uss.t.Error(err)
} else { } else {
t.Log("Success with" + e) uss.t.Log("Success with" + e)
} }
} }
} }
@ -204,7 +355,7 @@ func init() {
klog.InitFlags(nil) klog.InitFlags(nil)
} }
// TestNoRestraint should fail because the dummy QueueSet exercises no control // TestNoRestraint tests whether the no-restraint factory gives every client what it asks for
func TestNoRestraint(t *testing.T) { func TestNoRestraint(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
@ -214,23 +365,32 @@ func TestNoRestraint(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
nr := nrc.Complete(fq.DispatchingConfig{}) nr := nrc.Complete(fq.DispatchingConfig{})
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ uniformScenario{name: "NoRestraint",
{1001001001, 5, 10, time.Second, time.Second}, qs: nr,
{2002002002, 2, 10, time.Second, time.Second / 2}, clients: []uniformClient{
}, time.Second*10, false, true, false, false, "", clk, counter) {1001001001, 5, 10, time.Second, time.Second, false},
{2002002002, 2, 10, time.Second, time.Second / 2, false},
},
concurrencyLimit: 10,
evalDuration: time.Second * 15,
expectFair: []bool{true},
expectAllRequests: true,
clk: clk,
counter: counter,
}.exercise(t)
} }
func TestUniformFlows(t *testing.T) { func TestUniformFlowsHandSize1(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestUniformFlows", Name: "TestUniformFlowsHandSize1",
DesiredNumQueues: 8, DesiredNumQueues: 9,
QueueLengthLimit: 6, QueueLengthLimit: 8,
HandSize: 3, HandSize: 1,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
qsc, err := qsf.BeginConstruction(qCfg) qsc, err := qsf.BeginConstruction(qCfg)
@ -239,22 +399,33 @@ func TestUniformFlows(t *testing.T) {
} }
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ uniformScenario{name: qCfg.Name,
{1001001001, 5, 10, time.Second, time.Second}, qs: qs,
{2002002002, 5, 10, time.Second, time.Second}, clients: []uniformClient{
}, time.Second*20, true, true, true, true, "", clk, counter) {1001001001, 8, 20, time.Second, time.Second - 1, false},
{2002002002, 8, 20, time.Second, time.Second - 1, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
} }
func TestDifferentFlows(t *testing.T) { func TestUniformFlowsHandSize3(t *testing.T) {
metrics.Register() metrics.Register()
now := time.Now() now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil) clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter) qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestDifferentFlows", Name: "TestUniformFlowsHandSize3",
DesiredNumQueues: 8, DesiredNumQueues: 8,
QueueLengthLimit: 6, QueueLengthLimit: 4,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute, RequestWaitLimit: 10 * time.Minute,
} }
@ -263,11 +434,128 @@ func TestDifferentFlows(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 8, 30, time.Second, time.Second - 1, false},
{2002002002, 8, 30, time.Second, time.Second - 1, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ func TestDifferentFlowsExpectEqual(t *testing.T) {
{1001001001, 6, 10, time.Second, time.Second}, metrics.Register()
{2002002002, 5, 15, time.Second, time.Second / 2}, now := time.Now()
}, time.Second*20, true, true, true, true, "", clk, counter)
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectEqual",
DesiredNumQueues: 9,
QueueLengthLimit: 8,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 8, 20, time.Second, time.Second, false},
{2002002002, 7, 30, time.Second, time.Second / 2, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestDifferentFlowsExpectUnequal(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "DiffFlowsExpectUnequal",
DesiredNumQueues: 9,
QueueLengthLimit: 6,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
uniformScenario{name: qCfg.Name,
qs: qs,
clients: []uniformClient{
{1001001001, 4, 20, time.Second, time.Second - 1, false},
{2002002002, 2, 20, time.Second, time.Second - 1, false},
},
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}
func TestWindup(t *testing.T) {
metrics.Register()
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestWindup",
DesiredNumQueues: 9,
QueueLengthLimit: 6,
HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
}
qsc, err := qsf.BeginConstruction(qCfg)
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
uniformScenario{name: qCfg.Name, qs: qs,
clients: []uniformClient{
{1001001001, 2, 40, time.Second, -1, false},
{2002002002, 2, 40, time.Second, -1, true},
},
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectFair: []bool{true, false},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
} }
func TestDifferentFlowsWithoutQueuing(t *testing.T) { func TestDifferentFlowsWithoutQueuing(t *testing.T) {
@ -286,20 +574,20 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
} }
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ uniformScenario{name: qCfg.Name,
{1001001001, 6, 10, time.Second, 57 * time.Millisecond}, qs: qs,
{2002002002, 4, 15, time.Second, 750 * time.Millisecond}, clients: []uniformClient{
}, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter) {1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
err = metrics.GatherAndCompare(` {2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system },
# TYPE apiserver_flowcontrol_rejected_requests_total counter concurrencyLimit: 4,
apiserver_flowcontrol_rejected_requests_total{flowSchema="client0",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 2 evalDuration: time.Second * 13,
apiserver_flowcontrol_rejected_requests_total{flowSchema="client1",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 4 expectFair: []bool{false},
`, evalExecutingMetrics: true,
"apiserver_flowcontrol_rejected_requests_total") rejectReason: "concurrency-limit",
if err != nil { clk: clk,
t.Fatal(err) counter: counter,
} }.exercise(t)
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
@ -321,9 +609,20 @@ func TestTimeout(t *testing.T) {
} }
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ uniformScenario{name: qCfg.Name,
{1001001001, 5, 100, time.Second, time.Second}, qs: qs,
}, time.Second*10, true, false, true, true, "time-out", clk, counter) clients: []uniformClient{
{1001001001, 5, 100, time.Second, time.Second, false},
},
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectFair: []bool{true},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
}.exercise(t)
} }
func TestContextCancel(t *testing.T) { func TestContextCancel(t *testing.T) {

View File

@ -32,6 +32,7 @@ type Integrator interface {
Set(float64) // set the value of X Set(float64) // set the value of X
Add(float64) // add the given quantity to X Add(float64) // add the given quantity to X
GetResults() IntegratorResults GetResults() IntegratorResults
Reset() IntegratorResults // restart the integration from now
} }
// IntegratorResults holds statistical abstracts of the integration // IntegratorResults holds statistical abstracts of the integration
@ -83,6 +84,10 @@ func (igr *integrator) updateLocked() {
func (igr *integrator) GetResults() (results IntegratorResults) { func (igr *integrator) GetResults() (results IntegratorResults) {
igr.Lock() igr.Lock()
defer func() { igr.Unlock() }() defer func() { igr.Unlock() }()
return igr.getResultsLocked()
}
func (igr *integrator) getResultsLocked() (results IntegratorResults) {
igr.updateLocked() igr.updateLocked()
results.Duration = igr.integrals[0] results.Duration = igr.integrals[0]
if results.Duration <= 0 { if results.Duration <= 0 {
@ -101,3 +106,11 @@ func (igr *integrator) GetResults() (results IntegratorResults) {
} }
return return
} }
func (igr *integrator) Reset() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
results = igr.getResultsLocked()
igr.integrals = [3]float64{0, 0, 0}
return
}