apiserver: add flow control metric current_inqueue_seats

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
This commit is contained in:
Andrew Sy Kim 2023-07-24 19:40:05 +00:00
parent 6e879bbaa8
commit fb9646fd60
3 changed files with 41 additions and 9 deletions

View File

@ -448,6 +448,7 @@ func (req *request) wait() (bool, bool) {
qs.totRequestsCancelled++ qs.totRequestsCancelled++
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled") metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
req.NoteQueued(false) req.NoteQueued(false)
qs.reqsGaugePair.RequestsWaiting.Add(-1) qs.reqsGaugePair.RequestsWaiting.Add(-1)
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
@ -652,6 +653,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
disqueueSeats += req.MaxSeats() disqueueSeats += req.MaxSeats()
req.NoteQueued(false) req.NoteQueued(false)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
metrics.AddSeatsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -req.MaxSeats())
} }
// we need to check if the next request has timed out. // we need to check if the next request has timed out.
return true return true
@ -702,6 +704,7 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
qs.totRequestsWaiting++ qs.totRequestsWaiting++
qs.totSeatsWaiting += request.MaxSeats() qs.totSeatsWaiting += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddSeatsInQueues(request.ctx, qs.qCfg.Name, request.fsName, request.MaxSeats())
request.NoteQueued(true) request.NoteQueued(true)
qs.reqsGaugePair.RequestsWaiting.Add(1) qs.reqsGaugePair.RequestsWaiting.Add(1)
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting)) qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
@ -760,6 +763,7 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsWaiting-- qs.totRequestsWaiting--
qs.totSeatsWaiting -= request.MaxSeats() qs.totSeatsWaiting -= request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
metrics.AddSeatsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -request.MaxSeats())
request.NoteQueued(false) request.NoteQueued(false)
qs.reqsGaugePair.RequestsWaiting.Add(-1) qs.reqsGaugePair.RequestsWaiting.Add(-1)
defer qs.boundNextDispatchLocked(queue) defer qs.boundNextDispatchLocked(queue)

View File

@ -209,13 +209,13 @@ func (us uniformScenario) exercise(t *testing.T) {
type uniformScenarioState struct { type uniformScenarioState struct {
t *testing.T t *testing.T
uniformScenario uniformScenario
startTime time.Time startTime time.Time
doSplit bool doSplit bool
execSeatsIntegrators []fq.Integrator execSeatsIntegrators []fq.Integrator
seatDemandIntegratorCheck fq.Integrator seatDemandIntegratorCheck fq.Integrator
failedCount uint64 failedCount uint64
expectedInqueue, expectedExecuting, expectedConcurrencyInUse string expectedInqueueReqs, expectedInqueueSeats, expectedExecuting, expectedConcurrencyInUse string
executions, rejects []int32 executions, rejects []int32
} }
func (uss *uniformScenarioState) exercise() { func (uss *uniformScenarioState) exercise() {
@ -226,7 +226,8 @@ func (uss *uniformScenarioState) exercise() {
for i, uc := range uss.clients { for i, uc := range uss.clients {
uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i)) uss.execSeatsIntegrators[i] = fq.NewNamedIntegrator(uss.clk, fmt.Sprintf("%s client %d execSeats", uss.name, i))
fsName := fmt.Sprintf("client%d", i) fsName := fmt.Sprintf("client%d", i)
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") uss.expectedInqueueReqs = uss.expectedInqueueReqs + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
uss.expectedInqueueSeats = uss.expectedInqueueSeats + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_seats{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
for j := 0; j < uc.nThreads; j++ { for j := 0; j < uc.nThreads; j++ {
ust := uniformScenarioThread{ ust := uniformScenarioThread{
uss: uss, uss: uss,
@ -412,13 +413,24 @@ func (uss *uniformScenarioState) finalReview() {
e := ` e := `
# HELP apiserver_flowcontrol_current_inqueue_requests [BETA] Number of requests currently pending in queues of the API Priority and Fairness subsystem # HELP apiserver_flowcontrol_current_inqueue_requests [BETA] Number of requests currently pending in queues of the API Priority and Fairness subsystem
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge # TYPE apiserver_flowcontrol_current_inqueue_requests gauge
` + uss.expectedInqueue ` + uss.expectedInqueueReqs
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
if err != nil { if err != nil {
uss.t.Error(err) uss.t.Error(err)
} else { } else {
uss.t.Log("Success with" + e) uss.t.Log("Success with" + e)
} }
e = `
# HELP apiserver_flowcontrol_current_inqueue_seats [ALPHA] Number of seats currently pending in queues of the API Priority and Fairness subsystem
# TYPE apiserver_flowcontrol_current_inqueue_seats gauge
` + uss.expectedInqueueSeats
err = metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_seats")
if err != nil {
uss.t.Error(err)
} else {
uss.t.Log("Success with" + e)
}
} }
expectedRejects := "" expectedRejects := ""
for i := range uss.clients { for i := range uss.clients {

View File

@ -210,6 +210,16 @@ var (
}, },
[]string{priorityLevel, flowSchema}, []string{priorityLevel, flowSchema},
) )
apiserverCurrentInqueueSeats = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "current_inqueue_seats",
Help: "Number of seats currently pending in queues of the API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel, flowSchema},
)
apiserverRequestQueueLength = compbasemetrics.NewHistogramVec( apiserverRequestQueueLength = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{ &compbasemetrics.HistogramOpts{
Namespace: namespace, Namespace: namespace,
@ -455,6 +465,7 @@ var (
apiserverNextSBounds, apiserverNextSBounds,
apiserverNextDiscountedSBounds, apiserverNextDiscountedSBounds,
apiserverCurrentInqueueRequests, apiserverCurrentInqueueRequests,
apiserverCurrentInqueueSeats,
apiserverRequestQueueLength, apiserverRequestQueueLength,
apiserverRequestConcurrencyLimit, apiserverRequestConcurrencyLimit,
apiserverRequestConcurrencyInUse, apiserverRequestConcurrencyInUse,
@ -518,6 +529,11 @@ func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string,
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
} }
// AddSeatsInQueues adds the given delta to the gauge of the # of seats in the queues of the specified flowSchema and priorityLevel
func AddSeatsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentInqueueSeats.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
}
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel // AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) { func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))