From 8a1b60320986eca05cb281bcce45332e0969268e Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 5 Mar 2020 15:13:46 -0500 Subject: [PATCH] Fix queued request accounting, extended queueset test --- .../fairqueuing/queueset/queueset.go | 2 +- .../fairqueuing/queueset/queueset_test.go | 50 ++++++++++++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 8f4db56dd78..3c64bdcb5c8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -460,6 +460,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s req.decision.SetLocked(decisionReject) // get index for timed out requests timeoutIdx = i + metrics.ChangeRequestsInQueues(qs.qCfg.Name, req.fsName, -1) } else { break } @@ -472,7 +473,6 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s queue.requests = reqs[removeIdx:] // decrement the # of requestsEnqueued qs.totRequestsWaiting -= removeIdx - metrics.ChangeRequestsInQueues(qs.qCfg.Name, fsName, -removeIdx) } } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 13579179947..984917edb7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -56,6 +56,7 @@ type uniformClient struct { func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, evalDuration time.Duration, expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool, + rejectReason string, clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { now := time.Now() @@ -68,6 +69,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, metrics.Reset() } executions := make([]int32, len(sc)) + rejects := make([]int32, len(sc)) for i, uc := range sc { integrators[i] = test.NewIntegrator(clk) fsName := fmt.Sprintf("client%d", i) @@ -81,6 +83,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle) if req == nil { atomic.AddUint64(&failedCount, 1) + atomic.AddInt32(&rejects[i], 1) break } if idle { @@ -98,6 +101,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2) if !executed { atomic.AddUint64(&failedCount, 1) + atomic.AddInt32(&rejects[i], 1) } } counter.Add(-1) @@ -137,29 +141,49 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, t.Errorf("Expected failed requests but all requests succeeded") } if expectInqueueMetrics { - err := metrics.GatherAndCompare(` + e := ` # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system # TYPE apiserver_flowcontrol_current_inqueue_requests gauge -`+expectedInqueue, - "apiserver_flowcontrol_current_inqueue_requests") +` + expectedInqueue + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") if err != nil { - t.Fatal(err) + t.Error(err) + } else { + t.Log("Success with" + e) } } + expectedRejects := "" for i := range sc { fsName := fmt.Sprintf("client%d", i) if atomic.AddInt32(&executions[i], 0) > 0 { expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n") } + if atomic.AddInt32(&rejects[i], 0) > 0 { + expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n") + } } if expectExecutingMetrics && len(expectedExecuting) > 0 { - err := metrics.GatherAndCompare(` + e := ` # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system # TYPE apiserver_flowcontrol_current_executing_requests gauge -`+expectedExecuting, - "apiserver_flowcontrol_current_executing_requests") +` + expectedExecuting + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests") if err != nil { - t.Fatal(err) + t.Error(err) + } else { + t.Log("Success with" + e) + } + } + if expectExecutingMetrics && len(expectedRejects) > 0 { + e := ` + # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system + # TYPE apiserver_flowcontrol_rejected_requests_total counter +` + expectedRejects + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total") + if err != nil { + t.Error(err) + } else { + t.Log("Success with" + e) } } } @@ -193,7 +217,7 @@ func TestNoRestraint(t *testing.T) { exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 2, 10, time.Second, time.Second / 2}, - }, time.Second*10, false, true, false, false, clk, counter) + }, time.Second*10, false, true, false, false, "", clk, counter) } func TestUniformFlows(t *testing.T) { @@ -218,7 +242,7 @@ func TestUniformFlows(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 5, 10, time.Second, time.Second}, - }, time.Second*20, true, true, true, true, clk, counter) + }, time.Second*20, true, true, true, true, "", clk, counter) } func TestDifferentFlows(t *testing.T) { @@ -243,7 +267,7 @@ func TestDifferentFlows(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 6, 10, time.Second, time.Second}, {2002002002, 5, 15, time.Second, time.Second / 2}, - }, time.Second*20, true, true, true, true, clk, counter) + }, time.Second*20, true, true, true, true, "", clk, counter) } func TestDifferentFlowsWithoutQueuing(t *testing.T) { @@ -265,7 +289,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 6, 10, time.Second, 57 * time.Millisecond}, {2002002002, 4, 15, time.Second, 750 * time.Millisecond}, - }, time.Second*13, false, false, false, true, clk, counter) + }, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter) err = metrics.GatherAndCompare(` # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system # TYPE apiserver_flowcontrol_rejected_requests_total counter @@ -299,7 +323,7 @@ func TestTimeout(t *testing.T) { exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{ {1001001001, 5, 100, time.Second, time.Second}, - }, time.Second*10, true, false, true, true, clk, counter) + }, time.Second*10, true, false, true, true, "time-out", clk, counter) } func TestContextCancel(t *testing.T) {