mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
Merge pull request #113483 from MikeSpreitzer/add-borrowing-metrics
Define metrics for API Priority and Fairness borrowing
This commit is contained in:
commit
7ae0396666
@ -216,6 +216,9 @@ type priorityLevelState struct {
|
|||||||
// Integrator of seat demand, reset every CurrentCL adjustment period
|
// Integrator of seat demand, reset every CurrentCL adjustment period
|
||||||
seatDemandIntegrator fq.Integrator
|
seatDemandIntegrator fq.Integrator
|
||||||
|
|
||||||
|
// Gauge of seat demand / nominalCL
|
||||||
|
seatDemandRatioedGauge metrics.RatioedGauge
|
||||||
|
|
||||||
// seatDemandStats is derived from periodically examining the seatDemandIntegrator.
|
// seatDemandStats is derived from periodically examining the seatDemandIntegrator.
|
||||||
// The average, standard deviation, and high watermark come directly from the integrator.
|
// The average, standard deviation, and high watermark come directly from the integrator.
|
||||||
// envelope = avg + stdDev.
|
// envelope = avg + stdDev.
|
||||||
@ -368,7 +371,7 @@ func (cfgCtlr *configController) updateBorrowing() {
|
|||||||
for _, plState := range cfgCtlr.priorityLevelStates {
|
for _, plState := range cfgCtlr.priorityLevelStates {
|
||||||
obs := plState.seatDemandIntegrator.Reset()
|
obs := plState.seatDemandIntegrator.Reset()
|
||||||
plState.seatDemandStats.update(obs)
|
plState.seatDemandStats.update(obs)
|
||||||
// TODO: set borrowing metrics introduced in https://github.com/kubernetes/enhancements/pull/3391
|
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed /* TODO: add the designed rest for borrowing */)
|
||||||
// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
|
// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -599,9 +602,10 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
|
|||||||
reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues),
|
reqsGaugePair: metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues),
|
||||||
execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues),
|
execSeatsObs: meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues),
|
||||||
seatDemandIntegrator: fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name),
|
seatDemandIntegrator: fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name),
|
||||||
|
seatDemandRatioedGauge: metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{pl.Name}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, state.seatDemandIntegrator)
|
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
||||||
continue
|
continue
|
||||||
@ -705,7 +709,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.seatDemandIntegrator)
|
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This can not happen because queueSetCompleterForPL already approved this config
|
// This can not happen because queueSetCompleterForPL already approved this config
|
||||||
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
|
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
|
||||||
@ -740,6 +744,8 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
|||||||
// difference will be negligible.
|
// difference will be negligible.
|
||||||
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.NominalConcurrencyShares) / meal.shareSum))
|
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.NominalConcurrencyShares) / meal.shareSum))
|
||||||
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
|
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
|
||||||
|
metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit /* TODO: pass min and max once new API is available */, concurrencyLimit, concurrencyLimit)
|
||||||
|
plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit))
|
||||||
meal.maxExecutingRequests += concurrencyLimit
|
meal.maxExecutingRequests += concurrencyLimit
|
||||||
var waitLimit int
|
var waitLimit int
|
||||||
if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
|
if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
|
||||||
@ -762,7 +768,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
|||||||
// given priority level configuration. Returns nil if that config
|
// given priority level configuration. Returns nil if that config
|
||||||
// does not call for limiting. Returns nil and an error if the given
|
// does not call for limiting. Returns nil and an error if the given
|
||||||
// object is malformed in a way that is a problem for this package.
|
// object is malformed in a way that is a problem for this package.
|
||||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandIntgrator fq.Integrator) (fq.QueueSetCompleter, error) {
|
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
|
||||||
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
||||||
return nil, errors.New("broken union structure at the top")
|
return nil, errors.New("broken union structure at the top")
|
||||||
}
|
}
|
||||||
@ -791,7 +797,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
|
|||||||
if queues != nil {
|
if queues != nil {
|
||||||
qsc, err = queues.BeginConfigChange(qcQS)
|
qsc, err = queues.BeginConfigChange(qcQS)
|
||||||
} else {
|
} else {
|
||||||
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandIntgrator)
|
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
|
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
|
||||||
@ -840,7 +846,8 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
|
|||||||
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
|
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
|
||||||
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
|
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
|
||||||
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
|
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
|
||||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, seatDemandIntegrator)
|
seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
|
||||||
|
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This can not happen because proto is one of the mandatory
|
// This can not happen because proto is one of the mandatory
|
||||||
// objects and these are not erroneous
|
// objects and these are not erroneous
|
||||||
@ -852,6 +859,7 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
|
|||||||
reqsGaugePair: reqsGaugePair,
|
reqsGaugePair: reqsGaugePair,
|
||||||
execSeatsObs: execSeatsObs,
|
execSeatsObs: execSeatsObs,
|
||||||
seatDemandIntegrator: seatDemandIntegrator,
|
seatDemandIntegrator: seatDemandIntegrator,
|
||||||
|
seatDemandRatioedGauge: seatDemandRatioedGauge,
|
||||||
}
|
}
|
||||||
if proto.Spec.Limited != nil {
|
if proto.Spec.Limited != nil {
|
||||||
meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)
|
meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)
|
||||||
|
@ -105,7 +105,7 @@ type ctlrTestRequest struct {
|
|||||||
descr1, descr2 interface{}
|
descr1, descr2 interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi fq.Integrator) (fq.QueueSetCompleter, error) {
|
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi metrics.Gauge) (fq.QueueSetCompleter, error) {
|
||||||
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,8 +31,7 @@ import (
|
|||||||
// Integrator is created, and ends at the latest operation on the
|
// Integrator is created, and ends at the latest operation on the
|
||||||
// Integrator.
|
// Integrator.
|
||||||
type Integrator interface {
|
type Integrator interface {
|
||||||
Set(float64)
|
fcmetrics.Gauge
|
||||||
Add(float64)
|
|
||||||
|
|
||||||
GetResults() IntegratorResults
|
GetResults() IntegratorResults
|
||||||
|
|
||||||
@ -77,6 +78,24 @@ func (igr *integrator) Set(x float64) {
|
|||||||
igr.Unlock()
|
igr.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) Add(deltaX float64) {
|
||||||
|
igr.Lock()
|
||||||
|
igr.setLocked(igr.x + deltaX)
|
||||||
|
igr.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) Inc() {
|
||||||
|
igr.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) Dec() {
|
||||||
|
igr.Add(-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) SetToCurrentTime() {
|
||||||
|
igr.Set(float64(time.Now().UnixNano()))
|
||||||
|
}
|
||||||
|
|
||||||
func (igr *integrator) setLocked(x float64) {
|
func (igr *integrator) setLocked(x float64) {
|
||||||
igr.updateLocked()
|
igr.updateLocked()
|
||||||
igr.x = x
|
igr.x = x
|
||||||
@ -88,12 +107,6 @@ func (igr *integrator) setLocked(x float64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (igr *integrator) Add(deltaX float64) {
|
|
||||||
igr.Lock()
|
|
||||||
igr.setLocked(igr.x + deltaX)
|
|
||||||
igr.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (igr *integrator) updateLocked() {
|
func (igr *integrator) updateLocked() {
|
||||||
now := igr.clock.Now()
|
now := igr.clock.Now()
|
||||||
dt := now.Sub(igr.lastTime).Seconds()
|
dt := now.Sub(igr.lastTime).Seconds()
|
||||||
|
@ -35,9 +35,8 @@ type QueueSetFactory interface {
|
|||||||
// The RatioedGaugePair observes number of requests,
|
// The RatioedGaugePair observes number of requests,
|
||||||
// execution covering just the regular phase.
|
// execution covering just the regular phase.
|
||||||
// The RatioedGauge observes number of seats occupied through all phases of execution.
|
// The RatioedGauge observes number of seats occupied through all phases of execution.
|
||||||
// The Integrator observes the seat demand (executing + queued seats), and
|
// The Gauge observes the seat demand (executing + queued seats).
|
||||||
// the queueset does not read or reset the Integrator.
|
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error)
|
||||||
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, Integrator) (QueueSetCompleter, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueSetCompleter finishes the two-step process of creating or
|
// QueueSetCompleter finishes the two-step process of creating or
|
||||||
|
@ -63,7 +63,7 @@ type queueSetCompleter struct {
|
|||||||
factory *queueSetFactory
|
factory *queueSetFactory
|
||||||
reqsGaugePair metrics.RatioedGaugePair
|
reqsGaugePair metrics.RatioedGaugePair
|
||||||
execSeatsGauge metrics.RatioedGauge
|
execSeatsGauge metrics.RatioedGauge
|
||||||
seatDemandIntegrator fq.Integrator
|
seatDemandIntegrator metrics.Gauge
|
||||||
theSet *queueSet
|
theSet *queueSet
|
||||||
qCfg fq.QueuingConfig
|
qCfg fq.QueuingConfig
|
||||||
dealer *shufflesharding.Dealer
|
dealer *shufflesharding.Dealer
|
||||||
@ -94,7 +94,7 @@ type queueSet struct {
|
|||||||
|
|
||||||
execSeatsGauge metrics.RatioedGauge // for all phases of execution
|
execSeatsGauge metrics.RatioedGauge // for all phases of execution
|
||||||
|
|
||||||
seatDemandIntegrator fq.Integrator
|
seatDemandIntegrator metrics.Gauge
|
||||||
|
|
||||||
promiseFactory promiseFactory
|
promiseFactory promiseFactory
|
||||||
|
|
||||||
@ -163,7 +163,7 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator fq.Integrator) (fq.QueueSetCompleter, error) {
|
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator metrics.Gauge) (fq.QueueSetCompleter, error) {
|
||||||
dealer, err := checkConfig(qCfg)
|
dealer, err := checkConfig(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -40,7 +40,7 @@ type noRestraint struct{}
|
|||||||
|
|
||||||
type noRestraintRequest struct{}
|
type noRestraintRequest struct{}
|
||||||
|
|
||||||
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, fq.Integrator) (fq.QueueSetCompleter, error) {
|
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (fq.QueueSetCompleter, error) {
|
||||||
return noRestraintCompleter{}, nil
|
return noRestraintCompleter{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,12 +65,13 @@ type resettable interface {
|
|||||||
Reset()
|
Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset all metrics to zero
|
// Reset all resettable metrics to zero
|
||||||
func Reset() {
|
func Reset() {
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
rm := metric.(resettable)
|
if rm, ok := metric.(resettable); ok {
|
||||||
rm.Reset()
|
rm.Reset()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GatherAndCompare the given metrics with the given Prometheus syntax expected value
|
// GatherAndCompare the given metrics with the given Prometheus syntax expected value
|
||||||
@ -316,6 +317,120 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
apiserverNominalConcurrencyLimits = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "nominal_limit_seats",
|
||||||
|
Help: "Nominal number of execution seats configured for each priority level",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverMinimumConcurrencyLimits = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "lower_limit_seats",
|
||||||
|
Help: "Configured lower bound on number of execution seats available to each priority level",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverMaximumConcurrencyLimits = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "upper_limit_seats",
|
||||||
|
Help: "Configured upper bound on number of execution seats available to each priority level",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
ApiserverSeatDemands = NewTimingRatioHistogramVec(
|
||||||
|
&compbasemetrics.TimingHistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "demand_seats",
|
||||||
|
Help: "Observations, at the end of every nanosecond, of (the number of seats each priority level could use) / (nominal number of seats for that level)",
|
||||||
|
// Rationale for the bucket boundaries:
|
||||||
|
// For 0--1, evenly spaced and not too many;
|
||||||
|
// For 1--2, roughly powers of sqrt(sqrt(2));
|
||||||
|
// For 2--6, roughly powers of sqrt(2);
|
||||||
|
// We need coverage over 1, but do not want too many buckets.
|
||||||
|
Buckets: []float64{0.2, 0.4, 0.6, 0.8, 1, 1.2, 1.4, 1.7, 2, 2.8, 4, 6},
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
priorityLevel,
|
||||||
|
)
|
||||||
|
apiserverSeatDemandHighWatermarks = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "demand_seats_high_watermark",
|
||||||
|
Help: "High watermark, over last adjustment period, of demand_seats",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverSeatDemandAverages = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "demand_seats_average",
|
||||||
|
Help: "Time-weighted average, over last adjustment period, of demand_seats",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverSeatDemandStandardDeviations = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "demand_seats_stdev",
|
||||||
|
Help: "Time-weighted standard deviation, over last adjustment period, of demand_seats",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverSeatDemandSmootheds = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "demand_seats_smoothed",
|
||||||
|
Help: "Smoothed seat demands",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverSeatDemandTargets = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "target_seats",
|
||||||
|
Help: "Seat allocation targets",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverFairFracs = compbasemetrics.NewGauge(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "seat_fair_frac",
|
||||||
|
Help: "Fair fraction of server's concurrency to allocate to each priority level that can use it",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
})
|
||||||
|
apiserverCurrentConcurrencyLimits = compbasemetrics.NewGaugeVec(
|
||||||
|
&compbasemetrics.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "current_limit_seats",
|
||||||
|
Help: "current derived number of execution seats available to each priority level",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
|
||||||
metrics = Registerables{
|
metrics = Registerables{
|
||||||
apiserverRejectedRequestsTotal,
|
apiserverRejectedRequestsTotal,
|
||||||
@ -336,10 +451,21 @@ var (
|
|||||||
apiserverEpochAdvances,
|
apiserverEpochAdvances,
|
||||||
apiserverWorkEstimatedSeats,
|
apiserverWorkEstimatedSeats,
|
||||||
apiserverDispatchWithNoAccommodation,
|
apiserverDispatchWithNoAccommodation,
|
||||||
|
apiserverNominalConcurrencyLimits,
|
||||||
|
apiserverMinimumConcurrencyLimits,
|
||||||
|
apiserverMaximumConcurrencyLimits,
|
||||||
|
apiserverSeatDemandHighWatermarks,
|
||||||
|
apiserverSeatDemandAverages,
|
||||||
|
apiserverSeatDemandStandardDeviations,
|
||||||
|
apiserverSeatDemandSmootheds,
|
||||||
|
apiserverSeatDemandTargets,
|
||||||
|
apiserverFairFracs,
|
||||||
|
apiserverCurrentConcurrencyLimits,
|
||||||
}.
|
}.
|
||||||
Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
|
Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
|
||||||
Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
|
Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
|
||||||
Append(readWriteConcurrencyGaugeVec.metrics()...)
|
Append(readWriteConcurrencyGaugeVec.metrics()...).
|
||||||
|
Append(ApiserverSeatDemands.metrics()...)
|
||||||
)
|
)
|
||||||
|
|
||||||
type indexOnce struct {
|
type indexOnce struct {
|
||||||
@ -457,3 +583,23 @@ func ObserveWorkEstimatedSeats(priorityLevel, flowSchema string, seats int) {
|
|||||||
func AddDispatchWithNoAccommodation(priorityLevel, flowSchema string) {
|
func AddDispatchWithNoAccommodation(priorityLevel, flowSchema string) {
|
||||||
apiserverDispatchWithNoAccommodation.WithLabelValues(priorityLevel, flowSchema).Inc()
|
apiserverDispatchWithNoAccommodation.WithLabelValues(priorityLevel, flowSchema).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SetPriorityLevelConfiguration(priorityLevel string, nominalCL, minCL, maxCL int) {
|
||||||
|
apiserverNominalConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(nominalCL))
|
||||||
|
apiserverMinimumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(minCL))
|
||||||
|
apiserverMaximumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(maxCL))
|
||||||
|
}
|
||||||
|
|
||||||
|
func NotePriorityLevelConcurrencyAdjustment(priorityLevel string, seatDemandHWM, seatDemandAvg, seatDemandStdev, seatDemandSmoothed float64 /* TODO: seatDemandTarget float64, currentCL int */) {
|
||||||
|
apiserverSeatDemandHighWatermarks.WithLabelValues(priorityLevel).Set(seatDemandHWM)
|
||||||
|
apiserverSeatDemandAverages.WithLabelValues(priorityLevel).Set(seatDemandAvg)
|
||||||
|
apiserverSeatDemandStandardDeviations.WithLabelValues(priorityLevel).Set(seatDemandStdev)
|
||||||
|
apiserverSeatDemandSmootheds.WithLabelValues(priorityLevel).Set(seatDemandSmoothed)
|
||||||
|
// TODO: the following once new API is available
|
||||||
|
// apiserverSeatDemandTargets.WithLabelValues(priorityLevel).Set(seatDemandTarget)
|
||||||
|
// apiserverCurrentConcurrencyLimits.WithLabelValues(priorityLevel).Set(currentCL)
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetFairFrac(fairFrac float64) {
|
||||||
|
apiserverFairFracs.Set(fairFrac)
|
||||||
|
}
|
||||||
|
@ -0,0 +1,56 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
type unionGauge []Gauge
|
||||||
|
|
||||||
|
var _ Gauge = unionGauge(nil)
|
||||||
|
|
||||||
|
// NewUnionGauge constructs a Gauge that delegates to all of the given Gauges
|
||||||
|
func NewUnionGauge(elts ...Gauge) Gauge {
|
||||||
|
return unionGauge(elts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ug unionGauge) Set(x float64) {
|
||||||
|
for _, gauge := range ug {
|
||||||
|
gauge.Set(x)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ug unionGauge) Add(x float64) {
|
||||||
|
for _, gauge := range ug {
|
||||||
|
gauge.Add(x)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ug unionGauge) Inc() {
|
||||||
|
for _, gauge := range ug {
|
||||||
|
gauge.Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ug unionGauge) Dec() {
|
||||||
|
for _, gauge := range ug {
|
||||||
|
gauge.Dec()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ug unionGauge) SetToCurrentTime() {
|
||||||
|
for _, gauge := range ug {
|
||||||
|
gauge.SetToCurrentTime()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user