Add FIFO queue depth metrics

Kubernetes-commit: 864357774f24ee17b2bc7bde84eb5b87cc7ab95b
This commit is contained in:
Richa Banker
2026-02-05 11:42:20 -08:00
committed by Kubernetes Publisher
parent 50ef81ad18
commit 117e93e87c
2 changed files with 125 additions and 9 deletions

79
tools/cache/fifo_metrics.go vendored Normal file
View File

@@ -0,0 +1,79 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cache is a client-side caching mechanism. It is useful for
// reducing the number of server calls you'd otherwise need to make.
// Reflector watches a server and updates a Store. Two stores are provided;
// one that simply caches objects (for example, to allow a scheduler to
// list currently available nodes), and one that additionally acts as
// a FIFO queue (for example, to allow a scheduler to process incoming
// pods).
package cache
import (
"sync"
)
var (
globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{}
setFIFOMetricsProviderOnce sync.Once
)
type noopFIFOMetricsProvider struct{}
// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations.
type FIFOMetricsProvider interface {
// NewQueuedItemMetric returns a gauge metric for tracking the total number of items
// currently queued and waiting to be processed.
// The returned metric should check id.Reserved() before updating to support
// dynamic informers that may shut down while the process is still running.
//
// For DeltaFIFO: Represents len(f.items) - the number of unique keys with pending deltas
// For RealFIFO: Represents len(f.items) - the total number of individual delta events queued
NewQueuedItemMetric(id InformerNameAndResource) GaugeMetric
}
// fifoMetrics holds all metrics for a FIFO.
type fifoMetrics struct {
numberOfQueuedItem GaugeMetric
}
// SetFIFOMetricsProvider sets the metrics provider for all subsequently created
// FIFOs. Only the first call has an effect.
func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) {
setFIFOMetricsProviderOnce.Do(func() {
globalFIFOMetricsProvider = metricsProvider
})
}
func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvider) *fifoMetrics {
if metricsProvider == nil {
metricsProvider = globalFIFOMetricsProvider
}
metrics := &fifoMetrics{
numberOfQueuedItem: noopMetric{},
}
if id.Reserved() {
metrics.numberOfQueuedItem = metricsProvider.NewQueuedItemMetric(id)
}
return metrics
}
func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric {
return noopMetric{}
}

View File

@@ -56,6 +56,14 @@ type RealFIFOOptions struct {
// while processing events to allow other goroutines to add items to the queue.
// If UnlockWhileProcessing is true, AtomicEvents must be true as well.
UnlockWhileProcessing bool
// Identifier is used to identify this FIFO for metrics and logging purposes.
// Optional. If zero value, metrics will not be published and trace logs will not
// include Name or Resource fields.
Identifier InformerNameAndResource
// MetricsProvider is used to create metrics for the FIFO.
MetricsProvider FIFOMetricsProvider
}
const (
@@ -113,6 +121,12 @@ type RealFIFO struct {
// This may only be set if emitAtomicEvents is true. If unlockWhileProcessing is true,
// Pop and PopBatch must be called from a single threaded consumer.
unlockWhileProcessing bool
// identifier is used to identify this FIFO for metrics and logging purposes.
identifier InformerNameAndResource
// metrics holds all metrics for this FIFO.
metrics *fifoMetrics
}
// ReplacedAllInfo is the object associated with a Delta of type=ReplacedAll
@@ -209,6 +223,7 @@ func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bo
Object: obj,
})
f.cond.Broadcast()
f.metrics.numberOfQueuedItem.Set(float64(len(f.items)))
return nil
}
@@ -238,6 +253,7 @@ func (f *RealFIFO) addReplaceToItemsLocked(objs []interface{}, resourceVersion s
Object: info,
})
f.cond.Broadcast()
f.metrics.numberOfQueuedItem.Set(float64(len(f.items)))
return nil
}
@@ -248,6 +264,7 @@ func (f *RealFIFO) addResyncToItemsLocked() error {
Object: SyncAllInfo{},
})
f.cond.Broadcast()
f.metrics.numberOfQueuedItem.Set(float64(len(f.items)))
return nil
}
@@ -340,12 +357,21 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// https://github.com/kubernetes/kubernetes/issues/103789
if len(f.items) > 10 {
id, _ := f.keyOf(item)
trace := utiltrace.New("RealFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: len(f.items)},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
fields := []utiltrace.Field{
{Key: "ID", Value: id},
{Key: "Depth", Value: len(f.items)},
{Key: "Reason", Value: "slow event handlers blocking the queue"},
}
if name := f.identifier.Name(); len(name) > 0 {
fields = append(fields, utiltrace.Field{Key: "Name", Value: name})
}
if gvr := f.identifier.GroupVersionResource(); !gvr.Empty() {
fields = append(fields, utiltrace.Field{Key: "Resource", Value: gvr})
}
trace := utiltrace.New("RealFIFO Pop Process", fields...)
defer trace.LogIfLong(100 * time.Millisecond)
}
f.metrics.numberOfQueuedItem.Set(float64(len(f.items)))
// Process the item, this may unlock the lock, and allow other goroutines to add items to the queue.
err := f.whileProcessing_locked(func() error {
@@ -459,13 +485,22 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
// https://github.com/kubernetes/kubernetes/issues/103789
if len(f.items) > 10 {
id, _ := f.keyOf(deltas[0])
trace := utiltrace.New("RealFIFO PopBatch Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: len(f.items)},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"},
utiltrace.Field{Key: "BatchSize", Value: len(deltas)})
fields := []utiltrace.Field{
{Key: "ID", Value: id},
{Key: "Depth", Value: len(f.items)},
{Key: "Reason", Value: "slow event handlers blocking the queue"},
{Key: "BatchSize", Value: len(deltas)},
}
if name := f.identifier.Name(); len(name) > 0 {
fields = append(fields, utiltrace.Field{Key: "Name", Value: name})
}
if gvr := f.identifier.GroupVersionResource(); !gvr.Empty() {
fields = append(fields, utiltrace.Field{Key: "Resource", Value: gvr})
}
trace := utiltrace.New("RealFIFO PopBatch Process", fields...)
defer trace.LogIfLong(min(100*time.Millisecond*time.Duration(len(deltas)), time.Second))
}
f.metrics.numberOfQueuedItem.Set(float64(len(f.items)))
if len(deltas) == 1 {
return f.whileProcessing_locked(func() error {
@@ -711,6 +746,8 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
batchSize: defaultBatchSize,
emitAtomicEvents: opts.AtomicEvents,
unlockWhileProcessing: opts.UnlockWhileProcessing,
identifier: opts.Identifier,
metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider),
}
f.cond.L = &f.lock