mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #102795 from tkashem/apf-width-metric
apf: add a gauge to show the number of seats currently in use
This commit is contained in:
commit
fddb3adcfd
@ -593,6 +593,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flo
|
|||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
qs.totSeatsInUse += req.Seats()
|
qs.totSeatsInUse += req.Seats()
|
||||||
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
||||||
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||||
@ -627,6 +628,7 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||||
request.NoteQueued(false)
|
request.NoteQueued(false)
|
||||||
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.Seats())
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.obsPair.RequestsWaiting.Add(-1)
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
@ -727,6 +729,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
qs.totRequestsExecuting--
|
qs.totRequestsExecuting--
|
||||||
qs.totSeatsInUse -= r.Seats()
|
qs.totSeatsInUse -= r.Seats()
|
||||||
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
||||||
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
|
||||||
qs.obsPair.RequestsExecuting.Add(-1)
|
qs.obsPair.RequestsExecuting.Add(-1)
|
||||||
|
|
||||||
if r.queue == nil {
|
if r.queue == nil {
|
||||||
|
@ -153,12 +153,12 @@ func (us uniformScenario) exercise(t *testing.T) {
|
|||||||
type uniformScenarioState struct {
|
type uniformScenarioState struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
uniformScenario
|
uniformScenario
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
doSplit bool
|
doSplit bool
|
||||||
integrators []fq.Integrator
|
integrators []fq.Integrator
|
||||||
failedCount uint64
|
failedCount uint64
|
||||||
expectedInqueue, expectedExecuting string
|
expectedInqueue, expectedExecuting, expectedConcurrencyInUse string
|
||||||
executions, rejects []int32
|
executions, rejects []int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uss *uniformScenarioState) exercise() {
|
func (uss *uniformScenarioState) exercise() {
|
||||||
@ -313,6 +313,7 @@ func (uss *uniformScenarioState) finalReview() {
|
|||||||
fsName := fmt.Sprintf("client%d", i)
|
fsName := fmt.Sprintf("client%d", i)
|
||||||
if atomic.AddInt32(&uss.executions[i], 0) > 0 {
|
if atomic.AddInt32(&uss.executions[i], 0) > 0 {
|
||||||
uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
|
uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
|
||||||
|
uss.expectedConcurrencyInUse = uss.expectedConcurrencyInUse + fmt.Sprintf(` apiserver_flowcontrol_request_concurrency_in_use{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
|
||||||
}
|
}
|
||||||
if atomic.AddInt32(&uss.rejects[i], 0) > 0 {
|
if atomic.AddInt32(&uss.rejects[i], 0) > 0 {
|
||||||
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n")
|
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n")
|
||||||
@ -330,6 +331,18 @@ func (uss *uniformScenarioState) finalReview() {
|
|||||||
uss.t.Log("Success with" + e)
|
uss.t.Log("Success with" + e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if uss.evalExecutingMetrics && len(uss.expectedConcurrencyInUse) > 0 {
|
||||||
|
e := `
|
||||||
|
# HELP apiserver_flowcontrol_request_concurrency_in_use [ALPHA] Concurrency (number of seats) occupided by the currently executing requests in the API Priority and Fairness system
|
||||||
|
# TYPE apiserver_flowcontrol_request_concurrency_in_use gauge
|
||||||
|
` + uss.expectedConcurrencyInUse
|
||||||
|
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_request_concurrency_in_use")
|
||||||
|
if err != nil {
|
||||||
|
uss.t.Error(err)
|
||||||
|
} else {
|
||||||
|
uss.t.Log("Success with" + e)
|
||||||
|
}
|
||||||
|
}
|
||||||
if uss.evalExecutingMetrics && len(expectedRejects) > 0 {
|
if uss.evalExecutingMetrics && len(expectedRejects) > 0 {
|
||||||
e := `
|
e := `
|
||||||
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
|
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
|
||||||
|
@ -185,6 +185,16 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
apiserverRequestConcurrencyInUse = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "request_concurrency_in_use",
|
||||||
|
Help: "Concurrency (number of seats) occupided by the currently executing requests in the API Priority and Fairness system",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, flowSchema},
|
||||||
|
)
|
||||||
apiserverRequestWaitingSeconds = compbasemetrics.NewHistogramVec(
|
apiserverRequestWaitingSeconds = compbasemetrics.NewHistogramVec(
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -213,6 +223,7 @@ var (
|
|||||||
apiserverCurrentInqueueRequests,
|
apiserverCurrentInqueueRequests,
|
||||||
apiserverRequestQueueLength,
|
apiserverRequestQueueLength,
|
||||||
apiserverRequestConcurrencyLimit,
|
apiserverRequestConcurrencyLimit,
|
||||||
|
apiserverRequestConcurrencyInUse,
|
||||||
apiserverCurrentExecutingRequests,
|
apiserverCurrentExecutingRequests,
|
||||||
apiserverRequestWaitingSeconds,
|
apiserverRequestWaitingSeconds,
|
||||||
apiserverRequestExecutionSeconds,
|
apiserverRequestExecutionSeconds,
|
||||||
@ -231,6 +242,12 @@ func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string,
|
|||||||
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddRequestConcurrencyInUse adds the given delta to the gauge of concurrency in use by
|
||||||
|
// the currently executing requests of the given flowSchema and priorityLevel
|
||||||
|
func AddRequestConcurrencyInUse(priorityLevel, flowSchema string, delta int) {
|
||||||
|
apiserverRequestConcurrencyInUse.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
|
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
|
||||||
func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
|
func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
|
||||||
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))
|
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))
|
||||||
|
Loading…
Reference in New Issue
Block a user