diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 55ab62c76..b78ed522e 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -452,9 +452,9 @@ type InformerOptions struct { // If not set, metrics will not be published. Identifier InformerNameAndResource - // FIFOMetricsProvider is the metrics provider for the FIFO queue. + // InformerMetricsProvider is the metrics provider for the informer. // If not set, metrics will be no-ops. - FIFOMetricsProvider FIFOMetricsProvider + InformerMetricsProvider InformerMetricsProvider } // NewInformerWithOptions returns a Store and a controller for populating the store @@ -464,9 +464,9 @@ type InformerOptions struct { func NewInformerWithOptions(options InformerOptions) (Store, Controller) { var clientState Store if options.Indexers == nil { - clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc) + clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)) } else { - clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers) + clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)) } return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } @@ -814,7 +814,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co if options.Logger != nil { logger = *options.Logger } - logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.InformerMetricsProvider) cfg := &Config{ Queue: fifo, @@ -844,7 +844,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co // It returns the FIFO and the logger used by the FIFO. // That logger includes the name used for the FIFO, // in contrast to the logger which was passed in. -func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) { +func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) (klog.Logger, Queue) { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ Logger: &logger, diff --git a/tools/cache/fifo_metrics.go b/tools/cache/fifo_metrics.go index 9e8ee069b..7a6781724 100644 --- a/tools/cache/fifo_metrics.go +++ b/tools/cache/fifo_metrics.go @@ -28,14 +28,14 @@ import ( ) var ( - globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{} - setFIFOMetricsProviderOnce sync.Once + globalInformerMetricsProvider InformerMetricsProvider = noopInformerMetricsProvider{} + setInformerMetricsProviderOnce sync.Once ) -type noopFIFOMetricsProvider struct{} +type noopInformerMetricsProvider struct{} -// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations. -type FIFOMetricsProvider interface { +// InformerMetricsProvider defines an interface for creating metrics that track informer operations. +type InformerMetricsProvider interface { // NewQueuedItemMetric returns a gauge metric for tracking the total number of items // currently queued and waiting to be processed. // The returned metric should check id.Reserved() before updating to support @@ -51,6 +51,11 @@ type FIFOMetricsProvider interface { // The returned metric should check id.Reserved() before updating to support // dynamic informers that may shut down while the process is still running. NewProcessingLatencyMetric(id InformerNameAndResource) HistogramMetric + + // NewStoreResourceVersionMetric returns a gauge metric for tracking the resource version of the store. + // The returned metric should check id.Reserved() before updating to support + // dynamic informers that may shut down while the process is still running. + NewStoreResourceVersionMetric(id InformerNameAndResource) GaugeMetric } // fifoMetrics holds all metrics for a FIFO. @@ -59,17 +64,22 @@ type fifoMetrics struct { processingLatency HistogramMetric } -// SetFIFOMetricsProvider sets the metrics provider for all subsequently created +// storeMetrics holds all metrics for a store. +type storeMetrics struct { + storeResourceVersion GaugeMetric +} + +// SetInformerMetricsProvider sets the metrics provider for all subsequently created // FIFOs. Only the first call has an effect. -func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) { - setFIFOMetricsProviderOnce.Do(func() { - globalFIFOMetricsProvider = metricsProvider +func SetInformerMetricsProvider(metricsProvider InformerMetricsProvider) { + setInformerMetricsProviderOnce.Do(func() { + globalInformerMetricsProvider = metricsProvider }) } -func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvider) *fifoMetrics { +func newFIFOMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *fifoMetrics { if metricsProvider == nil { - metricsProvider = globalFIFOMetricsProvider + metricsProvider = globalInformerMetricsProvider } metrics := &fifoMetrics{ numberOfQueuedItem: noopMetric{}, @@ -84,10 +94,29 @@ func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvi return metrics } -func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric { +func newStoreMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *storeMetrics { + if metricsProvider == nil { + metricsProvider = globalInformerMetricsProvider + } + metrics := &storeMetrics{ + storeResourceVersion: noopMetric{}, + } + + if id.Reserved() { + metrics.storeResourceVersion = metricsProvider.NewStoreResourceVersionMetric(id) + } + + return metrics +} + +func (noopInformerMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric { return noopMetric{} } -func (noopFIFOMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric { +func (noopInformerMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric { + return noopMetric{} +} + +func (noopInformerMetricsProvider) NewStoreResourceVersionMetric(InformerNameAndResource) GaugeMetric { return noopMetric{} } diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 753de1e41..4cec4283f 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -323,7 +323,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker()) return &sharedIndexInformer{ - indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), + indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)), processor: processor, synced: make(chan struct{}), listerWatcher: lw, @@ -334,7 +334,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O clock: realClock, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), identifier: options.Identifier, - fifoMetricsProvider: options.FIFOMetricsProvider, + informerMetricsProvider: options.InformerMetricsProvider, keyFunc: DeletionHandlingMetaNamespaceKeyFunc, } } @@ -356,9 +356,9 @@ type SharedIndexInformerOptions struct { // If not set, metrics will not be published. Identifier InformerNameAndResource - // FIFOMetricsProvider is the metrics provider for the FIFO queue. + // InformerMetricsProvider is the metrics provider for the FIFO queue. // If not set, metrics will be no-ops. - FIFOMetricsProvider FIFOMetricsProvider + InformerMetricsProvider InformerMetricsProvider } // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. @@ -630,8 +630,8 @@ type sharedIndexInformer struct { // identifier is used to identify this informer for metrics and logging purposes. identifier InformerNameAndResource - // fifoMetricsProvider is the metrics provider for the FIFO queue. - fifoMetricsProvider FIFOMetricsProvider + // informerMetricsProvider is the metrics provider for the FIFO queue. + informerMetricsProvider InformerMetricsProvider // keyFunc is called when processing deltas by the underlying process function. keyFunc KeyFunc @@ -729,7 +729,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.informerMetricsProvider) cfg := &Config{ Queue: fifo, diff --git a/tools/cache/store.go b/tools/cache/store.go index 805d42cee..261ed60d9 100644 --- a/tools/cache/store.go +++ b/tools/cache/store.go @@ -209,6 +209,10 @@ type cache struct { keyFunc KeyFunc // Called with every object put in the cache. transformer TransformFunc + // identifier is used to identify the store for metrics. + identifier InformerNameAndResource + // metrics is the metrics provider for the store. + metrics InformerMetricsProvider } var _ Store = &cache{} @@ -395,22 +399,41 @@ func WithTransformer(transformer TransformFunc) StoreOption { } } +func WithStoreMetrics(identifier InformerNameAndResource, metrics InformerMetricsProvider) StoreOption { + return func(c *cache) { + c.identifier = identifier + c.metrics = metrics + } +} + // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store { c := &cache{ - cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), - keyFunc: keyFunc, + keyFunc: keyFunc, } for _, opt := range opts { opt(c) } + threadSafeOpts := []ThreadSafeStoreOption{} + if c.metrics != nil { + threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics)) + } + c.cacheStorage = NewThreadSafeStore(Indexers{}, Indices{}, threadSafeOpts...) return c } // NewIndexer returns an Indexer implemented simply with a map and a lock. -func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { - return &cache{ - cacheStorage: NewThreadSafeStore(indexers, Indices{}), - keyFunc: keyFunc, +func NewIndexer(keyFunc KeyFunc, indexers Indexers, opts ...StoreOption) Indexer { + c := &cache{ + keyFunc: keyFunc, } + for _, opt := range opts { + opt(c) + } + threadSafeOpts := []ThreadSafeStoreOption{} + if c.metrics != nil { + threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics)) + } + c.cacheStorage = NewThreadSafeStore(indexers, Indices{}, threadSafeOpts...) + return c } diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index bd6a3791f..f29c66653 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -72,7 +72,7 @@ type RealFIFOOptions struct { Identifier InformerNameAndResource // MetricsProvider is used to create metrics for the FIFO. - MetricsProvider FIFOMetricsProvider + MetricsProvider InformerMetricsProvider // EmitDeltaTypeBookmark is used to specify whether the RealFIFO will emit // bookmark deltas or not. This can only be set if AtomicEvents is true. diff --git a/tools/cache/thread_safe_store.go b/tools/cache/thread_safe_store.go index a306468ec..74ac8f1ab 100644 --- a/tools/cache/thread_safe_store.go +++ b/tools/cache/thread_safe_store.go @@ -18,6 +18,7 @@ package cache import ( "fmt" + "strconv" "sync" "time" @@ -81,6 +82,14 @@ type ThreadSafeStoreTransaction struct { Key string } +type ThreadSafeStoreOption = func(*threadSafeMap) + +func WithThreadSafeStoreMetrics(identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) ThreadSafeStoreOption { + return func(c *threadSafeMap) { + c.metrics = newStoreMetrics(identifier, metricsProvider) + } +} + // storeIndex implements the indexing functionality for Store interface type storeIndex struct { // indexers maps a name to an IndexFunc @@ -251,6 +260,10 @@ type threadSafeMap struct { // index implements the indexing functionality index *storeIndex rv string + + // metrics is used to expose metrics about the store + // and must be non-nil. If not provided, a noop implementation will be used. + metrics *storeMetrics } func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { @@ -259,6 +272,7 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { } finalObj := txns[len(txns)-1].Object rv, rvErr := rvFromObject(finalObj) + rvInt, parseErr := parseRVForMetricsWithTruncation(rv) c.lock.Lock() defer c.lock.Unlock() trace := utiltrace.New("ThreadSafeMap Transaction Process", @@ -278,6 +292,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { } if rvErr == nil { c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } } @@ -291,11 +308,15 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) { func (c *threadSafeMap) Update(key string, obj interface{}) { rv, rvErr := rvFromObject(obj) + rvInt, parseErr := parseRVForMetricsWithTruncation(rv) c.lock.Lock() defer c.lock.Unlock() c.updateLocked(key, obj) if rvErr == nil { c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } } @@ -311,15 +332,20 @@ func (c *threadSafeMap) Delete(key string) { func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) { var rv string - var rvErr error + var rvInt int64 + var rvErr, parseErr error if obj != nil { rv, rvErr = rvFromObject(obj) + rvInt, parseErr = parseRVForMetricsWithTruncation(rv) } c.lock.Lock() defer c.lock.Unlock() c.deleteLocked(key) if obj != nil && rvErr == nil { c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } } @@ -360,10 +386,18 @@ func (c *threadSafeMap) ListKeys() []string { } func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { + var rvInt int64 + var parseErr error + if resourceVersion != "" { + rvInt, parseErr = parseRVForMetricsWithTruncation(resourceVersion) + } c.lock.Lock() defer c.lock.Unlock() c.items = items c.rv = resourceVersion + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } // rebuild any index c.index.reset() for key, item := range c.items { @@ -411,9 +445,17 @@ func (c *threadSafeMap) LastStoreSyncResourceVersion() string { // Bookmark sets the latest resource version that the store has seen. func (c *threadSafeMap) Bookmark(rv string) { + var rvInt int64 + var parseErr error + if rv != "" { + rvInt, parseErr = parseRVForMetricsWithTruncation(rv) + } c.lock.Lock() defer c.lock.Unlock() c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value @@ -480,13 +522,31 @@ func (c *threadSafeMap) Resync() error { return nil } -// NewThreadSafeStore creates a new instance of ThreadSafeStore. -func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { - return &threadSafeMap{ +func NewThreadSafeStore(indexers Indexers, indices Indices, opts ...ThreadSafeStoreOption) ThreadSafeStore { + store := &threadSafeMap{ items: map[string]interface{}{}, index: &storeIndex{ indexers: indexers, indices: indices, }, } + for _, opt := range opts { + opt(store) + } + if store.metrics == nil { + store.metrics = newStoreMetrics(InformerNameAndResource{}, noopInformerMetricsProvider{}) + } + return store +} + +func parseRVForMetricsWithTruncation(rv string) (int64, error) { + if rv == "" { + return 0, nil + } + // Truncate to last 15 digits to ensure metrics are always less than 2^53-1 + // and avoid imprecise float64 representation. + if len(rv) > 15 { + rv = rv[len(rv)-15:] + } + return strconv.ParseInt(rv, 10, 64) }