diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index 3400e5aad23..9146a7c8296 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -80,9 +80,9 @@ func (r *PendingPodsRecorder) Clear() { r.recorder.Set(float64(0)) } -// histgramVecMetric is the data structure passed in the buffer channel between the main framework thread +// histogramVecMetric is the data structure passed in the buffer channel between the main framework thread // and the metricsRecorder goroutine. -type histgramVecMetric struct { +type histogramVecMetric struct { metric *metrics.HistogramVec labelValues []string value float64 @@ -102,7 +102,7 @@ type gaugeVecMetricKey struct { // MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path. type MetricAsyncRecorder struct { // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. - bufferCh chan *histgramVecMetric + bufferCh chan *histogramVecMetric // if bufferSize is reached, incoming metrics will be discarded. bufferSize int // how often the recorder runs to flush the metrics. @@ -125,7 +125,7 @@ type MetricAsyncRecorder struct { func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder { recorder := &MetricAsyncRecorder{ - bufferCh: make(chan *histgramVecMetric, bufferSize), + bufferCh: make(chan *histogramVecMetric, bufferSize), bufferSize: bufferSize, interval: interval, stopCh: stopCh, @@ -156,8 +156,9 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) { r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd) - // Only flush the metric to the channal if the interval is reached. + // Only flush the metric to the channel if the interval is reached. // The values are flushed to Prometheus in the run() function, which runs once the interval time. + // Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid. if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval { for key, value := range r.aggregatedInflightEventMetric { newMetric := &gaugeVecMetric{ @@ -171,11 +172,13 @@ func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valu } } r.aggregatedInflightEventMetricLastFlushTime = time.Now() + // reset + r.aggregatedInflightEventMetric = make(map[gaugeVecMetricKey]int) } } func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) { - newMetric := &histgramVecMetric{ + newMetric := &histogramVecMetric{ metric: m, labelValues: labelsValues, value: value, diff --git a/pkg/scheduler/metrics/metric_recorder_test.go b/pkg/scheduler/metrics/metric_recorder_test.go index dabf2b153e8..3abf6a4dacd 100644 --- a/pkg/scheduler/metrics/metric_recorder_test.go +++ b/pkg/scheduler/metrics/metric_recorder_test.go @@ -131,6 +131,9 @@ func TestInFlightEventAsync(t *testing.T) { // It adds -4 and flushes the metric to the channel. r.ObserveInFlightEventsAsync(podAddLabel, -4) + if len(r.aggregatedInflightEventMetric) != 0 { + t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric) + } got := []gaugeVecMetric{} for { diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 992c01e0456..a2fd58ba801 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -149,7 +149,7 @@ var ( &metrics.GaugeOpts{ Subsystem: SchedulerSubsystem, Name: "inflight_events", - Help: "Number of events recorded in the scheduling queue.", + Help: "Number of events currently tracked in the scheduling queue.", StabilityLevel: metrics.ALPHA, }, []string{"event"}) Goroutines = metrics.NewGaugeVec(