Add processing latency metric for RealFIFO

Kubernetes-commit: d049bd123391f4c1a86e493888560d8549a52dc2
This commit is contained in:
Richa Banker
2026-02-16 15:15:47 -08:00
committed by Kubernetes Publisher
parent 70ab833f66
commit 59f55827e6
3 changed files with 25 additions and 2 deletions

View File

@@ -44,11 +44,19 @@ type FIFOMetricsProvider interface {
// For DeltaFIFO: Represents len(f.items) - the number of unique keys with pending deltas
// For RealFIFO: Represents len(f.items) - the total number of individual delta events queued
NewQueuedItemMetric(id InformerNameAndResource) GaugeMetric
// NewProcessingLatencyMetric returns a histogram metric for tracking the time taken
// to process events (execute handlers) after they are popped from the queue.
// The latency is measured in seconds.
// The returned metric should check id.Reserved() before updating to support
// dynamic informers that may shut down while the process is still running.
NewProcessingLatencyMetric(id InformerNameAndResource) HistogramMetric
}
// fifoMetrics holds all metrics for a FIFO.
type fifoMetrics struct {
numberOfQueuedItem GaugeMetric
processingLatency HistogramMetric
}
// SetFIFOMetricsProvider sets the metrics provider for all subsequently created
@@ -65,10 +73,12 @@ func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvi
}
metrics := &fifoMetrics{
numberOfQueuedItem: noopMetric{},
processingLatency: noopMetric{},
}
if id.Reserved() {
metrics.numberOfQueuedItem = metricsProvider.NewQueuedItemMetric(id)
metrics.processingLatency = metricsProvider.NewProcessingLatencyMetric(id)
}
return metrics
@@ -77,3 +87,7 @@ func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvi
func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric {
return noopMetric{}
}
func (noopFIFOMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric {
return noopMetric{}
}

View File

@@ -40,6 +40,12 @@ type SummaryMetric interface {
Observe(float64)
}
// HistogramMetric captures individual observations into configurable buckets.
// It also provides a sum and count of observations.
type HistogramMetric interface {
Observe(float64)
}
type noopMetric struct{}
func (noopMetric) Inc() {}

View File

@@ -448,7 +448,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
return Deltas{item}, err
}
// whileProcessing_locked calls the `process` function.
// whileProcessing_locked calls the `process` function and records processing latency.
// The lock must be held before calling `whileProcessing_locked`, and is held when `whileProcessing_locked` returns.
// whileProcessing_locked releases the lock during the call to `process` if f.unlockWhileProcessing is true and the f.items queue is not too long.
func (f *RealFIFO) whileProcessing_locked(process func() error) error {
@@ -459,7 +459,10 @@ func (f *RealFIFO) whileProcessing_locked(process func() error) error {
f.lock.Unlock()
defer f.lock.Lock()
}
return process()
startTime := time.Now()
err := process()
f.metrics.processingLatency.Observe(time.Since(startTime).Seconds())
return err
}
// batchable stores the delta types that can be batched