Fix queued request accounting, extended queueset test

This commit is contained in:
Mike Spreitzer 2020-03-05 15:13:46 -05:00
parent f535a9c9ed
commit 8a1b603209
2 changed files with 38 additions and 14 deletions

View File

@ -460,6 +460,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
req.decision.SetLocked(decisionReject) req.decision.SetLocked(decisionReject)
// get index for timed out requests // get index for timed out requests
timeoutIdx = i timeoutIdx = i
metrics.ChangeRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
} else { } else {
break break
} }
@ -472,7 +473,6 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
queue.requests = reqs[removeIdx:] queue.requests = reqs[removeIdx:]
// decrement the # of requestsEnqueued // decrement the # of requestsEnqueued
qs.totRequestsWaiting -= removeIdx qs.totRequestsWaiting -= removeIdx
metrics.ChangeRequestsInQueues(qs.qCfg.Name, fsName, -removeIdx)
} }
} }

View File

@ -56,6 +56,7 @@ type uniformClient struct {
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario,
evalDuration time.Duration, evalDuration time.Duration,
expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool, expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool,
rejectReason string,
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
now := time.Now() now := time.Now()
@ -68,6 +69,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
metrics.Reset() metrics.Reset()
} }
executions := make([]int32, len(sc)) executions := make([]int32, len(sc))
rejects := make([]int32, len(sc))
for i, uc := range sc { for i, uc := range sc {
integrators[i] = test.NewIntegrator(clk) integrators[i] = test.NewIntegrator(clk)
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
@ -81,6 +83,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
if req == nil { if req == nil {
atomic.AddUint64(&failedCount, 1) atomic.AddUint64(&failedCount, 1)
atomic.AddInt32(&rejects[i], 1)
break break
} }
if idle { if idle {
@ -98,6 +101,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2) t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2)
if !executed { if !executed {
atomic.AddUint64(&failedCount, 1) atomic.AddUint64(&failedCount, 1)
atomic.AddInt32(&rejects[i], 1)
} }
} }
counter.Add(-1) counter.Add(-1)
@ -137,29 +141,49 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
t.Errorf("Expected failed requests but all requests succeeded") t.Errorf("Expected failed requests but all requests succeeded")
} }
if expectInqueueMetrics { if expectInqueueMetrics {
err := metrics.GatherAndCompare(` e := `
# HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge # TYPE apiserver_flowcontrol_current_inqueue_requests gauge
`+expectedInqueue, ` + expectedInqueue
"apiserver_flowcontrol_current_inqueue_requests") err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
if err != nil { if err != nil {
t.Fatal(err) t.Error(err)
} else {
t.Log("Success with" + e)
} }
} }
expectedRejects := ""
for i := range sc { for i := range sc {
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
if atomic.AddInt32(&executions[i], 0) > 0 { if atomic.AddInt32(&executions[i], 0) > 0 {
expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n")
} }
if atomic.AddInt32(&rejects[i], 0) > 0 {
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n")
}
} }
if expectExecutingMetrics && len(expectedExecuting) > 0 { if expectExecutingMetrics && len(expectedExecuting) > 0 {
err := metrics.GatherAndCompare(` e := `
# HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system
# TYPE apiserver_flowcontrol_current_executing_requests gauge # TYPE apiserver_flowcontrol_current_executing_requests gauge
`+expectedExecuting, ` + expectedExecuting
"apiserver_flowcontrol_current_executing_requests") err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests")
if err != nil { if err != nil {
t.Fatal(err) t.Error(err)
} else {
t.Log("Success with" + e)
}
}
if expectExecutingMetrics && len(expectedRejects) > 0 {
e := `
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
# TYPE apiserver_flowcontrol_rejected_requests_total counter
` + expectedRejects
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total")
if err != nil {
t.Error(err)
} else {
t.Log("Success with" + e)
} }
} }
} }
@ -193,7 +217,7 @@ func TestNoRestraint(t *testing.T) {
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second}, {1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 10, time.Second, time.Second / 2}, {2002002002, 2, 10, time.Second, time.Second / 2},
}, time.Second*10, false, true, false, false, clk, counter) }, time.Second*10, false, true, false, false, "", clk, counter)
} }
func TestUniformFlows(t *testing.T) { func TestUniformFlows(t *testing.T) {
@ -218,7 +242,7 @@ func TestUniformFlows(t *testing.T) {
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second}, {1001001001, 5, 10, time.Second, time.Second},
{2002002002, 5, 10, time.Second, time.Second}, {2002002002, 5, 10, time.Second, time.Second},
}, time.Second*20, true, true, true, true, clk, counter) }, time.Second*20, true, true, true, true, "", clk, counter)
} }
func TestDifferentFlows(t *testing.T) { func TestDifferentFlows(t *testing.T) {
@ -243,7 +267,7 @@ func TestDifferentFlows(t *testing.T) {
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 6, 10, time.Second, time.Second}, {1001001001, 6, 10, time.Second, time.Second},
{2002002002, 5, 15, time.Second, time.Second / 2}, {2002002002, 5, 15, time.Second, time.Second / 2},
}, time.Second*20, true, true, true, true, clk, counter) }, time.Second*20, true, true, true, true, "", clk, counter)
} }
func TestDifferentFlowsWithoutQueuing(t *testing.T) { func TestDifferentFlowsWithoutQueuing(t *testing.T) {
@ -265,7 +289,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 6, 10, time.Second, 57 * time.Millisecond}, {1001001001, 6, 10, time.Second, 57 * time.Millisecond},
{2002002002, 4, 15, time.Second, 750 * time.Millisecond}, {2002002002, 4, 15, time.Second, 750 * time.Millisecond},
}, time.Second*13, false, false, false, true, clk, counter) }, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter)
err = metrics.GatherAndCompare(` err = metrics.GatherAndCompare(`
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
# TYPE apiserver_flowcontrol_rejected_requests_total counter # TYPE apiserver_flowcontrol_rejected_requests_total counter
@ -299,7 +323,7 @@ func TestTimeout(t *testing.T) {
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
{1001001001, 5, 100, time.Second, time.Second}, {1001001001, 5, 100, time.Second, time.Second},
}, time.Second*10, true, false, true, true, clk, counter) }, time.Second*10, true, false, true, true, "time-out", clk, counter)
} }
func TestContextCancel(t *testing.T) { func TestContextCancel(t *testing.T) {