From c710f99ef730a791a6911e63cc3b9d26cced6bd3 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Thu, 10 Jun 2021 17:34:50 -0400 Subject: [PATCH] apf: add a gauge for the number of seats currently in use --- .../fairqueuing/queueset/queueset.go | 3 +++ .../fairqueuing/queueset/queueset_test.go | 25 ++++++++++++++----- .../pkg/util/flowcontrol/metrics/metrics.go | 17 +++++++++++++ 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index d55bc40f549..cf975f304bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -593,6 +593,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flo qs.totRequestsExecuting++ qs.totSeatsInUse += req.Seats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) @@ -627,6 +628,7 @@ func (qs *queueSet) dispatchLocked() bool { metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.Seats()) qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { @@ -727,6 +729,7 @@ func (qs *queueSet) finishRequestLocked(r *request) { qs.totRequestsExecuting-- qs.totSeatsInUse -= r.Seats() metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) + metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) qs.obsPair.RequestsExecuting.Add(-1) if r.queue == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 0779a4ffaca..7790179bd74 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -153,12 +153,12 @@ func (us uniformScenario) exercise(t *testing.T) { type uniformScenarioState struct { t *testing.T uniformScenario - startTime time.Time - doSplit bool - integrators []fq.Integrator - failedCount uint64 - expectedInqueue, expectedExecuting string - executions, rejects []int32 + startTime time.Time + doSplit bool + integrators []fq.Integrator + failedCount uint64 + expectedInqueue, expectedExecuting, expectedConcurrencyInUse string + executions, rejects []int32 } func (uss *uniformScenarioState) exercise() { @@ -313,6 +313,7 @@ func (uss *uniformScenarioState) finalReview() { fsName := fmt.Sprintf("client%d", i) if atomic.AddInt32(&uss.executions[i], 0) > 0 { uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") + uss.expectedConcurrencyInUse = uss.expectedConcurrencyInUse + fmt.Sprintf(` apiserver_flowcontrol_request_concurrency_in_use{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") } if atomic.AddInt32(&uss.rejects[i], 0) > 0 { expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n") @@ -330,6 +331,18 @@ func (uss *uniformScenarioState) finalReview() { uss.t.Log("Success with" + e) } } + if uss.evalExecutingMetrics && len(uss.expectedConcurrencyInUse) > 0 { + e := ` + # HELP apiserver_flowcontrol_request_concurrency_in_use [ALPHA] Concurrency (number of seats) occupided by the currently executing requests in the API Priority and Fairness system + # TYPE apiserver_flowcontrol_request_concurrency_in_use gauge +` + uss.expectedConcurrencyInUse + err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_request_concurrency_in_use") + if err != nil { + uss.t.Error(err) + } else { + uss.t.Log("Success with" + e) + } + } if uss.evalExecutingMetrics && len(expectedRejects) > 0 { e := ` # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 4ebe85577ba..88b2f13c7c4 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -185,6 +185,16 @@ var ( }, []string{priorityLevel, flowSchema}, ) + apiserverRequestConcurrencyInUse = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_concurrency_in_use", + Help: "Concurrency (number of seats) occupided by the currently executing requests in the API Priority and Fairness system", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{priorityLevel, flowSchema}, + ) apiserverRequestWaitingSeconds = compbasemetrics.NewHistogramVec( &compbasemetrics.HistogramOpts{ Namespace: namespace, @@ -213,6 +223,7 @@ var ( apiserverCurrentInqueueRequests, apiserverRequestQueueLength, apiserverRequestConcurrencyLimit, + apiserverRequestConcurrencyInUse, apiserverCurrentExecutingRequests, apiserverRequestWaitingSeconds, apiserverRequestExecutionSeconds, @@ -231,6 +242,12 @@ func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } +// AddRequestConcurrencyInUse adds the given delta to the gauge of concurrency in use by +// the currently executing requests of the given flowSchema and priorityLevel +func AddRequestConcurrencyInUse(priorityLevel, flowSchema string, delta int) { + apiserverRequestConcurrencyInUse.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) +} + // UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))