diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 316ca34794c..3deceb9127e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -441,7 +441,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte bestQueueIdx, bestQueueLen = queueIdx, thisLen } }) - klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) + klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) return bestQueueIdx } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index e9c700efcc4..c4157a696f1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "math" + "reflect" + "sort" "sync/atomic" "testing" "time" @@ -32,7 +34,60 @@ import ( "k8s.io/klog/v2" ) -type uniformScenario []uniformClient +// fairAlloc computes the max-min fair allocation of the given +// capacity to the given demands (which slice is not side-effected). +func fairAlloc(demands []float64, capacity float64) []float64 { + count := len(demands) + indices := make([]int, count) + for i := 0; i < count; i++ { + indices[i] = i + } + sort.Slice(indices, func(i, j int) bool { return demands[indices[i]] < demands[indices[j]] }) + alloc := make([]float64, count) + var next int + var prevAlloc float64 + for ; next < count; next++ { + // `capacity` is how much remains assuming that + // all unvisited items get `prevAlloc`. + idx := indices[next] + demand := demands[idx] + if demand <= 0 { + continue + } + // `fullCapacityBite` is how much more capacity would be used + // if this and all following items get as much as this one + // is demanding. + fullCapacityBite := float64(count-next) * (demand - prevAlloc) + if fullCapacityBite > capacity { + break + } + prevAlloc = demand + alloc[idx] = demand + capacity -= fullCapacityBite + } + for j := next; j < count; j++ { + alloc[indices[j]] = prevAlloc + capacity/float64(count-next) + } + return alloc +} + +func TestFairAlloc(t *testing.T) { + if e, a := []float64{0, 0}, fairAlloc([]float64{0, 0}, 42); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#+v, got #%+v", e, a) + } + if e, a := []float64{42, 0}, fairAlloc([]float64{47, 0}, 42); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#+v, got #%+v", e, a) + } + if e, a := []float64{1, 41}, fairAlloc([]float64{1, 47}, 42); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#+v, got #%+v", e, a) + } + if e, a := []float64{3, 5, 5, 1}, fairAlloc([]float64{3, 7, 9, 1}, 14); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#+v, got #%+v", e, a) + } + if e, a := []float64{1, 9, 7, 3}, fairAlloc([]float64{1, 9, 7, 3}, 21); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#+v, got #%+v", e, a) + } +} type uniformClient struct { hash uint64 @@ -40,150 +95,246 @@ type uniformClient struct { nCalls int // duration for a simulated synchronous call execDuration time.Duration - // duration for simulated "other work" + // duration for simulated "other work". This can be negative, + // causing a request to be launched a certain amount of time + // before the previous one finishes. thinkDuration time.Duration + // When true indicates that only half the specified number of + // threads should run during the first half of the evaluation + // period + split bool } -// exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients. +// uniformScenario describes a scenario based on the given set of uniform clients. // Each uniform client specifies a number of threads, each of which alternates between thinking // and making a synchronous request through the QueueSet. -// This function measures how much concurrency each client got, on average, over -// the initial evalDuration and tests to see whether they all got about the same amount. -// Each client needs to be demanding enough to use this amount, otherwise the fair result -// is not equal amounts and the simple test in this function would not accurately test fairness. -// expectPass indicates whether the QueueSet is expected to be fair. -// expectedAllRequests indicates whether all requests are expected to get dispatched. -func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, - evalDuration time.Duration, - expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool, - rejectReason string, - clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { +// The test measures how much concurrency each client got, on average, over +// the initial evalDuration and tests to see whether they all got about the fair amount. +// Each client needs to be demanding enough to use more than its fair share, +// or overall care needs to be taken about timing so that scheduling details +// do not cause any client to actually request a significantly smaller share +// than it theoretically should. +// expectFair indicate whether the QueueSet is expected to be +// fair in the respective halves of a split scenario; +// in a non-split scenario this is a singleton with one expectation. +// expectAllRequests indicates whether all requests are expected to get dispatched. +type uniformScenario struct { + name string + qs fq.QueueSet + clients []uniformClient + concurrencyLimit int + evalDuration time.Duration + expectFair []bool + expectAllRequests bool + evalInqueueMetrics, evalExecutingMetrics bool + rejectReason string + clk *clock.FakeEventClock + counter counter.GoRoutineCounter +} - now := time.Now() - t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter) - integrators := make([]test.Integrator, len(sc)) - var failedCount uint64 - expectedInqueue := "" - expectedExecuting := "" - if expectInqueueMetrics || expectExecutingMetrics { +func (us uniformScenario) exercise(t *testing.T) { + uss := uniformScenarioState{ + t: t, + uniformScenario: us, + startTime: time.Now(), + integrators: make([]test.Integrator, len(us.clients)), + executions: make([]int32, len(us.clients)), + rejects: make([]int32, len(us.clients)), + } + for _, uc := range us.clients { + uss.doSplit = uss.doSplit || uc.split + } + uss.exercise() +} + +type uniformScenarioState struct { + t *testing.T + uniformScenario + startTime time.Time + doSplit bool + integrators []test.Integrator + failedCount uint64 + expectedInqueue, expectedExecuting string + executions, rejects []int32 +} + +func (uss *uniformScenarioState) exercise() { + uss.t.Logf("%s: Start %s, doSplit=%v, clk=%p, grc=%p", uss.startTime.Format(nsTimeFmt), uss.name, uss.doSplit, uss.clk, uss.counter) + if uss.evalInqueueMetrics || uss.evalExecutingMetrics { metrics.Reset() } - executions := make([]int32, len(sc)) - rejects := make([]int32, len(sc)) - for i, uc := range sc { - integrators[i] = test.NewIntegrator(clk) + for i, uc := range uss.clients { + uss.integrators[i] = test.NewIntegrator(uss.clk) fsName := fmt.Sprintf("client%d", i) - expectedInqueue = expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") + uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n") for j := 0; j < uc.nThreads; j++ { - counter.Add(1) - go func(i, j int, uc uniformClient, igr test.Integrator) { - for k := 0; k < uc.nCalls; k++ { - ClockWait(clk, counter, uc.thinkDuration) - req, idle := qs.StartRequest(context.Background(), uc.hash, "", fsName, name, []int{i, j, k}) - t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) - if req == nil { - atomic.AddUint64(&failedCount, 1) - atomic.AddInt32(&rejects[i], 1) - break - } - if idle { - t.Error("got request but QueueSet reported idle") - } - var executed bool - idle2 := req.Finish(func() { - executed = true - t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k) - atomic.AddInt32(&executions[i], 1) - igr.Add(1) - ClockWait(clk, counter, uc.execDuration) - igr.Add(-1) - }) - t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2) - if !executed { - atomic.AddUint64(&failedCount, 1) - atomic.AddInt32(&rejects[i], 1) - } - } - counter.Add(-1) - }(i, j, uc, integrators[i]) + ust := uniformScenarioThread{ + uss: uss, + i: i, + j: j, + nCalls: uc.nCalls, + uc: uc, + igr: uss.integrators[i], + fsName: fsName, + } + ust.start() } } - lim := now.Add(evalDuration) - clk.Run(&lim) - clk.SetTime(lim) - t.Logf("%s: End", clk.Now().Format(nsTimeFmt)) - results := make([]test.IntegratorResults, len(sc)) - var sumOfAvg float64 - for i := range sc { - results[i] = integrators[i].GetResults() - sumOfAvg += results[i].Average - } - idealAverage := sumOfAvg / float64(len(sc)) - passes := make([]bool, len(sc)) - allPass := true - for i := range sc { - relDiff := (results[i].Average - idealAverage) / idealAverage - passes[i] = math.Abs(relDiff) <= 0.1 - allPass = allPass && passes[i] - } - for i := range sc { - if allPass != expectPass { - t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage) - } else { - t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage) - } + if uss.doSplit { + uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectFair[0]) } + uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectFair[len(uss.expectFair)-1]) + uss.clk.Run(nil) + uss.finalReview() +} - clk.Run(nil) - if expectedAllRequests && failedCount > 0 { - t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount) - } else if !expectedAllRequests && failedCount == 0 { - t.Errorf("Expected failed requests but all requests succeeded") +type uniformScenarioThread struct { + uss *uniformScenarioState + i, j int + nCalls int + uc uniformClient + igr test.Integrator + fsName string +} + +func (ust *uniformScenarioThread) start() { + initialDelay := time.Duration(11*ust.j + 2*ust.i) + if ust.uc.split && ust.j >= ust.uc.nThreads/2 { + initialDelay += ust.uss.evalDuration / 2 + ust.nCalls = ust.nCalls / 2 } - if expectInqueueMetrics { + ust.uss.clk.EventAfterDuration(ust.genCallK(0), initialDelay) +} + +// generates an EventFunc that forks a goroutine to do call k +func (ust *uniformScenarioThread) genCallK(k int) func(time.Time) { + return func(time.Time) { + // As an EventFunc, this has to return without waiting + // for time to pass, and so cannot do callK(k) itself. + ust.uss.counter.Add(1) + go func() { + ust.callK(k) + ust.uss.counter.Add(-1) + }() + } +} + +func (ust *uniformScenarioThread) callK(k int) { + if k >= ust.nCalls { + return + } + req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}) + ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) + if req == nil { + atomic.AddUint64(&ust.uss.failedCount, 1) + atomic.AddInt32(&ust.uss.rejects[ust.i], 1) + return + } + if idle { + ust.uss.t.Error("got request but QueueSet reported idle") + } + var executed bool + idle2 := req.Finish(func() { + executed = true + ust.uss.t.Logf("%s: %d, %d, %d executing", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k) + atomic.AddInt32(&ust.uss.executions[ust.i], 1) + ust.igr.Add(1) + ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) + ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration) + ust.igr.Add(-1) + }) + ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) + if !executed { + atomic.AddUint64(&ust.uss.failedCount, 1) + atomic.AddInt32(&ust.uss.rejects[ust.i], 1) + } +} + +func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) { + uss.clk.Run(&lim) + uss.clk.SetTime(lim) + if uss.doSplit && !last { + uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt)) + } else { + uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt)) + } + demands := make([]float64, len(uss.clients)) + averages := make([]float64, len(uss.clients)) + for i, uc := range uss.clients { + nThreads := uc.nThreads + if uc.split && !last { + nThreads = nThreads / 2 + } + demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration) + averages[i] = uss.integrators[i].Reset().Average + } + fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit)) + for i := range uss.clients { + var gotFair bool + if fairAverages[i] > 0 { + relDiff := (averages[i] - fairAverages[i]) / fairAverages[i] + gotFair = math.Abs(relDiff) <= 0.1 + } else { + gotFair = math.Abs(averages[i]) <= 0.1 + } + if gotFair != expectFair { + uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) + } else { + uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) + } + } +} + +func (uss *uniformScenarioState) finalReview() { + if uss.expectAllRequests && uss.failedCount > 0 { + uss.t.Errorf("Expected all requests to be successful but got %v failed requests", uss.failedCount) + } else if !uss.expectAllRequests && uss.failedCount == 0 { + uss.t.Errorf("Expected failed requests but all requests succeeded") + } + if uss.evalInqueueMetrics { e := ` # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system # TYPE apiserver_flowcontrol_current_inqueue_requests gauge -` + expectedInqueue +` + uss.expectedInqueue err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") if err != nil { - t.Error(err) + uss.t.Error(err) } else { - t.Log("Success with" + e) + uss.t.Log("Success with" + e) } } expectedRejects := "" - for i := range sc { + for i := range uss.clients { fsName := fmt.Sprintf("client%d", i) - if atomic.AddInt32(&executions[i], 0) > 0 { - expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") + if atomic.AddInt32(&uss.executions[i], 0) > 0 { + uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n") } - if atomic.AddInt32(&rejects[i], 0) > 0 { - expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n") + if atomic.AddInt32(&uss.rejects[i], 0) > 0 { + expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n") } } - if expectExecutingMetrics && len(expectedExecuting) > 0 { + if uss.evalExecutingMetrics && len(uss.expectedExecuting) > 0 { e := ` # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system # TYPE apiserver_flowcontrol_current_executing_requests gauge -` + expectedExecuting +` + uss.expectedExecuting err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests") if err != nil { - t.Error(err) + uss.t.Error(err) } else { - t.Log("Success with" + e) + uss.t.Log("Success with" + e) } } - if expectExecutingMetrics && len(expectedRejects) > 0 { + if uss.evalExecutingMetrics && len(expectedRejects) > 0 { e := ` # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system # TYPE apiserver_flowcontrol_rejected_requests_total counter ` + expectedRejects err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total") if err != nil { - t.Error(err) + uss.t.Error(err) } else { - t.Log("Success with" + e) + uss.t.Log("Success with" + e) } } } @@ -204,7 +355,7 @@ func init() { klog.InitFlags(nil) } -// TestNoRestraint should fail because the dummy QueueSet exercises no control +// TestNoRestraint tests whether the no-restraint factory gives every client what it asks for func TestNoRestraint(t *testing.T) { metrics.Register() now := time.Now() @@ -214,23 +365,32 @@ func TestNoRestraint(t *testing.T) { t.Fatal(err) } nr := nrc.Complete(fq.DispatchingConfig{}) - exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ - {1001001001, 5, 10, time.Second, time.Second}, - {2002002002, 2, 10, time.Second, time.Second / 2}, - }, time.Second*10, false, true, false, false, "", clk, counter) + uniformScenario{name: "NoRestraint", + qs: nr, + clients: []uniformClient{ + {1001001001, 5, 10, time.Second, time.Second, false}, + {2002002002, 2, 10, time.Second, time.Second / 2, false}, + }, + concurrencyLimit: 10, + evalDuration: time.Second * 15, + expectFair: []bool{true}, + expectAllRequests: true, + clk: clk, + counter: counter, + }.exercise(t) } -func TestUniformFlows(t *testing.T) { +func TestUniformFlowsHandSize1(t *testing.T) { metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ - Name: "TestUniformFlows", - DesiredNumQueues: 8, - QueueLengthLimit: 6, - HandSize: 3, + Name: "TestUniformFlowsHandSize1", + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 1, RequestWaitLimit: 10 * time.Minute, } qsc, err := qsf.BeginConstruction(qCfg) @@ -239,22 +399,33 @@ func TestUniformFlows(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) - exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ - {1001001001, 5, 10, time.Second, time.Second}, - {2002002002, 5, 10, time.Second, time.Second}, - }, time.Second*20, true, true, true, true, "", clk, counter) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + {1001001001, 8, 20, time.Second, time.Second - 1, false}, + {2002002002, 8, 20, time.Second, time.Second - 1, false}, + }, + concurrencyLimit: 4, + evalDuration: time.Second * 50, + expectFair: []bool{true}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) } -func TestDifferentFlows(t *testing.T) { +func TestUniformFlowsHandSize3(t *testing.T) { metrics.Register() now := time.Now() clk, counter := clock.NewFakeEventClock(now, 0, nil) qsf := NewQueueSetFactory(clk, counter) qCfg := fq.QueuingConfig{ - Name: "TestDifferentFlows", + Name: "TestUniformFlowsHandSize3", DesiredNumQueues: 8, - QueueLengthLimit: 6, + QueueLengthLimit: 4, HandSize: 3, RequestWaitLimit: 10 * time.Minute, } @@ -263,11 +434,128 @@ func TestDifferentFlows(t *testing.T) { t.Fatal(err) } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + {1001001001, 8, 30, time.Second, time.Second - 1, false}, + {2002002002, 8, 30, time.Second, time.Second - 1, false}, + }, + concurrencyLimit: 4, + evalDuration: time.Second * 60, + expectFair: []bool{true}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} - exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ - {1001001001, 6, 10, time.Second, time.Second}, - {2002002002, 5, 15, time.Second, time.Second / 2}, - }, time.Second*20, true, true, true, true, "", clk, counter) +func TestDifferentFlowsExpectEqual(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + qCfg := fq.QueuingConfig{ + Name: "DiffFlowsExpectEqual", + DesiredNumQueues: 9, + QueueLengthLimit: 8, + HandSize: 1, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) + + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + {1001001001, 8, 20, time.Second, time.Second, false}, + {2002002002, 7, 30, time.Second, time.Second / 2, false}, + }, + concurrencyLimit: 4, + evalDuration: time.Second * 40, + expectFair: []bool{true}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + +func TestDifferentFlowsExpectUnequal(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + qCfg := fq.QueuingConfig{ + Name: "DiffFlowsExpectUnequal", + DesiredNumQueues: 9, + QueueLengthLimit: 6, + HandSize: 1, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3}) + + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + {1001001001, 4, 20, time.Second, time.Second - 1, false}, + {2002002002, 2, 20, time.Second, time.Second - 1, false}, + }, + concurrencyLimit: 3, + evalDuration: time.Second * 20, + expectFair: []bool{true}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) +} + +func TestWindup(t *testing.T) { + metrics.Register() + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + qCfg := fq.QueuingConfig{ + Name: "TestWindup", + DesiredNumQueues: 9, + QueueLengthLimit: 6, + HandSize: 1, + RequestWaitLimit: 10 * time.Minute, + } + qsc, err := qsf.BeginConstruction(qCfg) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3}) + + uniformScenario{name: qCfg.Name, qs: qs, + clients: []uniformClient{ + {1001001001, 2, 40, time.Second, -1, false}, + {2002002002, 2, 40, time.Second, -1, true}, + }, + concurrencyLimit: 3, + evalDuration: time.Second * 40, + expectFair: []bool{true, false}, + expectAllRequests: true, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + clk: clk, + counter: counter, + }.exercise(t) } func TestDifferentFlowsWithoutQueuing(t *testing.T) { @@ -286,20 +574,20 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) - exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ - {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, - {2002002002, 4, 15, time.Second, 750 * time.Millisecond}, - }, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter) - err = metrics.GatherAndCompare(` - # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system - # TYPE apiserver_flowcontrol_rejected_requests_total counter - apiserver_flowcontrol_rejected_requests_total{flowSchema="client0",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 2 - apiserver_flowcontrol_rejected_requests_total{flowSchema="client1",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 4 - `, - "apiserver_flowcontrol_rejected_requests_total") - if err != nil { - t.Fatal(err) - } + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + {1001001001, 6, 10, time.Second, 57 * time.Millisecond, false}, + {2002002002, 4, 15, time.Second, 750 * time.Millisecond, false}, + }, + concurrencyLimit: 4, + evalDuration: time.Second * 13, + expectFair: []bool{false}, + evalExecutingMetrics: true, + rejectReason: "concurrency-limit", + clk: clk, + counter: counter, + }.exercise(t) } func TestTimeout(t *testing.T) { @@ -321,9 +609,20 @@ func TestTimeout(t *testing.T) { } qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) - exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ - {1001001001, 5, 100, time.Second, time.Second}, - }, time.Second*10, true, false, true, true, "time-out", clk, counter) + uniformScenario{name: qCfg.Name, + qs: qs, + clients: []uniformClient{ + {1001001001, 5, 100, time.Second, time.Second, false}, + }, + concurrencyLimit: 1, + evalDuration: time.Second * 10, + expectFair: []bool{true}, + evalInqueueMetrics: true, + evalExecutingMetrics: true, + rejectReason: "time-out", + clk: clk, + counter: counter, + }.exercise(t) } func TestContextCancel(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go index 22c75b7f253..e7ab1532242 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go @@ -32,6 +32,7 @@ type Integrator interface { Set(float64) // set the value of X Add(float64) // add the given quantity to X GetResults() IntegratorResults + Reset() IntegratorResults // restart the integration from now } // IntegratorResults holds statistical abstracts of the integration @@ -83,6 +84,10 @@ func (igr *integrator) updateLocked() { func (igr *integrator) GetResults() (results IntegratorResults) { igr.Lock() defer func() { igr.Unlock() }() + return igr.getResultsLocked() +} + +func (igr *integrator) getResultsLocked() (results IntegratorResults) { igr.updateLocked() results.Duration = igr.integrals[0] if results.Duration <= 0 { @@ -101,3 +106,11 @@ func (igr *integrator) GetResults() (results IntegratorResults) { } return } + +func (igr *integrator) Reset() (results IntegratorResults) { + igr.Lock() + defer func() { igr.Unlock() }() + results = igr.getResultsLocked() + igr.integrals = [3]float64{0, 0, 0} + return +}