fix: clear aggregatedInflightEventMetric after flushing

This commit is contained in:
Kensei Nakada 2024-09-02 21:53:31 +09:00
parent 110d28355d
commit 0ac5d745fe
3 changed files with 13 additions and 7 deletions

View File

@ -80,9 +80,9 @@ func (r *PendingPodsRecorder) Clear() {
r.recorder.Set(float64(0)) r.recorder.Set(float64(0))
} }
// histgramVecMetric is the data structure passed in the buffer channel between the main framework thread // histogramVecMetric is the data structure passed in the buffer channel between the main framework thread
// and the metricsRecorder goroutine. // and the metricsRecorder goroutine.
type histgramVecMetric struct { type histogramVecMetric struct {
metric *metrics.HistogramVec metric *metrics.HistogramVec
labelValues []string labelValues []string
value float64 value float64
@ -102,7 +102,7 @@ type gaugeVecMetricKey struct {
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path. // MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
type MetricAsyncRecorder struct { type MetricAsyncRecorder struct {
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
bufferCh chan *histgramVecMetric bufferCh chan *histogramVecMetric
// if bufferSize is reached, incoming metrics will be discarded. // if bufferSize is reached, incoming metrics will be discarded.
bufferSize int bufferSize int
// how often the recorder runs to flush the metrics. // how often the recorder runs to flush the metrics.
@ -125,7 +125,7 @@ type MetricAsyncRecorder struct {
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder { func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
recorder := &MetricAsyncRecorder{ recorder := &MetricAsyncRecorder{
bufferCh: make(chan *histgramVecMetric, bufferSize), bufferCh: make(chan *histogramVecMetric, bufferSize),
bufferSize: bufferSize, bufferSize: bufferSize,
interval: interval, interval: interval,
stopCh: stopCh, stopCh: stopCh,
@ -156,8 +156,9 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) { func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) {
r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd) r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd)
// Only flush the metric to the channal if the interval is reached. // Only flush the metric to the channel if the interval is reached.
// The values are flushed to Prometheus in the run() function, which runs once the interval time. // The values are flushed to Prometheus in the run() function, which runs once the interval time.
// Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid.
if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval { if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
for key, value := range r.aggregatedInflightEventMetric { for key, value := range r.aggregatedInflightEventMetric {
newMetric := &gaugeVecMetric{ newMetric := &gaugeVecMetric{
@ -171,11 +172,13 @@ func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valu
} }
} }
r.aggregatedInflightEventMetricLastFlushTime = time.Now() r.aggregatedInflightEventMetricLastFlushTime = time.Now()
// reset
r.aggregatedInflightEventMetric = make(map[gaugeVecMetricKey]int)
} }
} }
func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) { func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) {
newMetric := &histgramVecMetric{ newMetric := &histogramVecMetric{
metric: m, metric: m,
labelValues: labelsValues, labelValues: labelsValues,
value: value, value: value,

View File

@ -131,6 +131,9 @@ func TestInFlightEventAsync(t *testing.T) {
// It adds -4 and flushes the metric to the channel. // It adds -4 and flushes the metric to the channel.
r.ObserveInFlightEventsAsync(podAddLabel, -4) r.ObserveInFlightEventsAsync(podAddLabel, -4)
if len(r.aggregatedInflightEventMetric) != 0 {
t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric)
}
got := []gaugeVecMetric{} got := []gaugeVecMetric{}
for { for {

View File

@ -149,7 +149,7 @@ var (
&metrics.GaugeOpts{ &metrics.GaugeOpts{
Subsystem: SchedulerSubsystem, Subsystem: SchedulerSubsystem,
Name: "inflight_events", Name: "inflight_events",
Help: "Number of events recorded in the scheduling queue.", Help: "Number of events currently tracked in the scheduling queue.",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}, []string{"event"}) }, []string{"event"})
Goroutines = metrics.NewGaugeVec( Goroutines = metrics.NewGaugeVec(