Add metric tracking the latest cached rv of informers

Kubernetes-commit: 9fb9e933b2ebae2d94edd29b7dec2c1df9e625a6
This commit is contained in:
Michael Aspinwall
2026-03-04 22:13:54 +00:00
committed by Kubernetes Publisher
parent 9cdd63fee2
commit aae801378e
6 changed files with 149 additions and 37 deletions

View File

@@ -452,9 +452,9 @@ type InformerOptions struct {
// If not set, metrics will not be published.
Identifier InformerNameAndResource
// FIFOMetricsProvider is the metrics provider for the FIFO queue.
// InformerMetricsProvider is the metrics provider for the informer.
// If not set, metrics will be no-ops.
FIFOMetricsProvider FIFOMetricsProvider
InformerMetricsProvider InformerMetricsProvider
}
// NewInformerWithOptions returns a Store and a controller for populating the store
@@ -464,9 +464,9 @@ type InformerOptions struct {
func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
var clientState Store
if options.Indexers == nil {
clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider))
} else {
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider))
}
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
}
@@ -814,7 +814,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
if options.Logger != nil {
logger = *options.Logger
}
logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.InformerMetricsProvider)
cfg := &Config{
Queue: fifo,
@@ -844,7 +844,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
// It returns the FIFO and the logger used by the FIFO.
// That logger includes the name used for the FIFO,
// in contrast to the logger which was passed in.
func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) {
func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) (klog.Logger, Queue) {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
options := RealFIFOOptions{
Logger: &logger,

View File

@@ -28,14 +28,14 @@ import (
)
var (
globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{}
setFIFOMetricsProviderOnce sync.Once
globalInformerMetricsProvider InformerMetricsProvider = noopInformerMetricsProvider{}
setInformerMetricsProviderOnce sync.Once
)
type noopFIFOMetricsProvider struct{}
type noopInformerMetricsProvider struct{}
// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations.
type FIFOMetricsProvider interface {
// InformerMetricsProvider defines an interface for creating metrics that track informer operations.
type InformerMetricsProvider interface {
// NewQueuedItemMetric returns a gauge metric for tracking the total number of items
// currently queued and waiting to be processed.
// The returned metric should check id.Reserved() before updating to support
@@ -51,6 +51,11 @@ type FIFOMetricsProvider interface {
// The returned metric should check id.Reserved() before updating to support
// dynamic informers that may shut down while the process is still running.
NewProcessingLatencyMetric(id InformerNameAndResource) HistogramMetric
// NewStoreResourceVersionMetric returns a gauge metric for tracking the resource version of the store.
// The returned metric should check id.Reserved() before updating to support
// dynamic informers that may shut down while the process is still running.
NewStoreResourceVersionMetric(id InformerNameAndResource) GaugeMetric
}
// fifoMetrics holds all metrics for a FIFO.
@@ -59,17 +64,22 @@ type fifoMetrics struct {
processingLatency HistogramMetric
}
// SetFIFOMetricsProvider sets the metrics provider for all subsequently created
// storeMetrics holds all metrics for a store.
type storeMetrics struct {
storeResourceVersion GaugeMetric
}
// SetInformerMetricsProvider sets the metrics provider for all subsequently created
// FIFOs. Only the first call has an effect.
func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) {
setFIFOMetricsProviderOnce.Do(func() {
globalFIFOMetricsProvider = metricsProvider
func SetInformerMetricsProvider(metricsProvider InformerMetricsProvider) {
setInformerMetricsProviderOnce.Do(func() {
globalInformerMetricsProvider = metricsProvider
})
}
func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvider) *fifoMetrics {
func newFIFOMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *fifoMetrics {
if metricsProvider == nil {
metricsProvider = globalFIFOMetricsProvider
metricsProvider = globalInformerMetricsProvider
}
metrics := &fifoMetrics{
numberOfQueuedItem: noopMetric{},
@@ -84,10 +94,29 @@ func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvi
return metrics
}
func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric {
func newStoreMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *storeMetrics {
if metricsProvider == nil {
metricsProvider = globalInformerMetricsProvider
}
metrics := &storeMetrics{
storeResourceVersion: noopMetric{},
}
if id.Reserved() {
metrics.storeResourceVersion = metricsProvider.NewStoreResourceVersionMetric(id)
}
return metrics
}
func (noopInformerMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric {
return noopMetric{}
}
func (noopFIFOMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric {
func (noopInformerMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric {
return noopMetric{}
}
func (noopInformerMetricsProvider) NewStoreResourceVersionMetric(InformerNameAndResource) GaugeMetric {
return noopMetric{}
}

View File

@@ -323,7 +323,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)),
processor: processor,
synced: make(chan struct{}),
listerWatcher: lw,
@@ -334,7 +334,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
identifier: options.Identifier,
fifoMetricsProvider: options.FIFOMetricsProvider,
informerMetricsProvider: options.InformerMetricsProvider,
keyFunc: DeletionHandlingMetaNamespaceKeyFunc,
}
}
@@ -356,9 +356,9 @@ type SharedIndexInformerOptions struct {
// If not set, metrics will not be published.
Identifier InformerNameAndResource
// FIFOMetricsProvider is the metrics provider for the FIFO queue.
// InformerMetricsProvider is the metrics provider for the FIFO queue.
// If not set, metrics will be no-ops.
FIFOMetricsProvider FIFOMetricsProvider
InformerMetricsProvider InformerMetricsProvider
}
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
@@ -630,8 +630,8 @@ type sharedIndexInformer struct {
// identifier is used to identify this informer for metrics and logging purposes.
identifier InformerNameAndResource
// fifoMetricsProvider is the metrics provider for the FIFO queue.
fifoMetricsProvider FIFOMetricsProvider
// informerMetricsProvider is the metrics provider for the FIFO queue.
informerMetricsProvider InformerMetricsProvider
// keyFunc is called when processing deltas by the underlying process function.
keyFunc KeyFunc
@@ -729,7 +729,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.informerMetricsProvider)
cfg := &Config{
Queue: fifo,

35
tools/cache/store.go vendored
View File

@@ -209,6 +209,10 @@ type cache struct {
keyFunc KeyFunc
// Called with every object put in the cache.
transformer TransformFunc
// identifier is used to identify the store for metrics.
identifier InformerNameAndResource
// metrics is the metrics provider for the store.
metrics InformerMetricsProvider
}
var _ Store = &cache{}
@@ -395,22 +399,41 @@ func WithTransformer(transformer TransformFunc) StoreOption {
}
}
func WithStoreMetrics(identifier InformerNameAndResource, metrics InformerMetricsProvider) StoreOption {
return func(c *cache) {
c.identifier = identifier
c.metrics = metrics
}
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
c := &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
threadSafeOpts := []ThreadSafeStoreOption{}
if c.metrics != nil {
threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
}
c.cacheStorage = NewThreadSafeStore(Indexers{}, Indices{}, threadSafeOpts...)
return c
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
func NewIndexer(keyFunc KeyFunc, indexers Indexers, opts ...StoreOption) Indexer {
c := &cache{
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
threadSafeOpts := []ThreadSafeStoreOption{}
if c.metrics != nil {
threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
}
c.cacheStorage = NewThreadSafeStore(indexers, Indices{}, threadSafeOpts...)
return c
}

View File

@@ -72,7 +72,7 @@ type RealFIFOOptions struct {
Identifier InformerNameAndResource
// MetricsProvider is used to create metrics for the FIFO.
MetricsProvider FIFOMetricsProvider
MetricsProvider InformerMetricsProvider
// EmitDeltaTypeBookmark is used to specify whether the RealFIFO will emit
// bookmark deltas or not. This can only be set if AtomicEvents is true.

View File

@@ -18,6 +18,7 @@ package cache
import (
"fmt"
"strconv"
"sync"
"time"
@@ -81,6 +82,14 @@ type ThreadSafeStoreTransaction struct {
Key string
}
type ThreadSafeStoreOption = func(*threadSafeMap)
func WithThreadSafeStoreMetrics(identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) ThreadSafeStoreOption {
return func(c *threadSafeMap) {
c.metrics = newStoreMetrics(identifier, metricsProvider)
}
}
// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
@@ -251,6 +260,10 @@ type threadSafeMap struct {
// index implements the indexing functionality
index *storeIndex
rv string
// metrics is used to expose metrics about the store
// and must be non-nil. If not provided, a noop implementation will be used.
metrics *storeMetrics
}
func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
@@ -259,6 +272,7 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
}
finalObj := txns[len(txns)-1].Object
rv, rvErr := rvFromObject(finalObj)
rvInt, parseErr := parseRVForMetricsWithTruncation(rv)
c.lock.Lock()
defer c.lock.Unlock()
trace := utiltrace.New("ThreadSafeMap Transaction Process",
@@ -278,6 +292,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
}
if rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
@@ -291,11 +308,15 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) {
func (c *threadSafeMap) Update(key string, obj interface{}) {
rv, rvErr := rvFromObject(obj)
rvInt, parseErr := parseRVForMetricsWithTruncation(rv)
c.lock.Lock()
defer c.lock.Unlock()
c.updateLocked(key, obj)
if rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
@@ -311,15 +332,20 @@ func (c *threadSafeMap) Delete(key string) {
func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) {
var rv string
var rvErr error
var rvInt int64
var rvErr, parseErr error
if obj != nil {
rv, rvErr = rvFromObject(obj)
rvInt, parseErr = parseRVForMetricsWithTruncation(rv)
}
c.lock.Lock()
defer c.lock.Unlock()
c.deleteLocked(key)
if obj != nil && rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
@@ -360,10 +386,18 @@ func (c *threadSafeMap) ListKeys() []string {
}
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
var rvInt int64
var parseErr error
if resourceVersion != "" {
rvInt, parseErr = parseRVForMetricsWithTruncation(resourceVersion)
}
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
c.rv = resourceVersion
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
// rebuild any index
c.index.reset()
for key, item := range c.items {
@@ -411,9 +445,17 @@ func (c *threadSafeMap) LastStoreSyncResourceVersion() string {
// Bookmark sets the latest resource version that the store has seen.
func (c *threadSafeMap) Bookmark(rv string) {
var rvInt int64
var parseErr error
if rv != "" {
rvInt, parseErr = parseRVForMetricsWithTruncation(rv)
}
c.lock.Lock()
defer c.lock.Unlock()
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
@@ -480,13 +522,31 @@ func (c *threadSafeMap) Resync() error {
return nil
}
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
func NewThreadSafeStore(indexers Indexers, indices Indices, opts ...ThreadSafeStoreOption) ThreadSafeStore {
store := &threadSafeMap{
items: map[string]interface{}{},
index: &storeIndex{
indexers: indexers,
indices: indices,
},
}
for _, opt := range opts {
opt(store)
}
if store.metrics == nil {
store.metrics = newStoreMetrics(InformerNameAndResource{}, noopInformerMetricsProvider{})
}
return store
}
func parseRVForMetricsWithTruncation(rv string) (int64, error) {
if rv == "" {
return 0, nil
}
// Truncate to last 15 digits to ensure metrics are always less than 2^53-1
// and avoid imprecise float64 representation.
if len(rv) > 15 {
rv = rv[len(rv)-15:]
}
return strconv.ParseInt(rv, 10, 64)
}