diff --git a/tools/cache/fifo_metrics.go b/tools/cache/fifo_metrics.go new file mode 100644 index 000000000..613dcb7be --- /dev/null +++ b/tools/cache/fifo_metrics.go @@ -0,0 +1,79 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache is a client-side caching mechanism. It is useful for +// reducing the number of server calls you'd otherwise need to make. +// Reflector watches a server and updates a Store. Two stores are provided; +// one that simply caches objects (for example, to allow a scheduler to +// list currently available nodes), and one that additionally acts as +// a FIFO queue (for example, to allow a scheduler to process incoming +// pods). +package cache + +import ( + "sync" +) + +var ( + globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{} + setFIFOMetricsProviderOnce sync.Once +) + +type noopFIFOMetricsProvider struct{} + +// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations. +type FIFOMetricsProvider interface { + // NewQueuedItemMetric returns a gauge metric for tracking the total number of items + // currently queued and waiting to be processed. + // The returned metric should check id.Reserved() before updating to support + // dynamic informers that may shut down while the process is still running. + // + // For DeltaFIFO: Represents len(f.items) - the number of unique keys with pending deltas + // For RealFIFO: Represents len(f.items) - the total number of individual delta events queued + NewQueuedItemMetric(id InformerNameAndResource) GaugeMetric +} + +// fifoMetrics holds all metrics for a FIFO. +type fifoMetrics struct { + numberOfQueuedItem GaugeMetric +} + +// SetFIFOMetricsProvider sets the metrics provider for all subsequently created +// FIFOs. Only the first call has an effect. +func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) { + setFIFOMetricsProviderOnce.Do(func() { + globalFIFOMetricsProvider = metricsProvider + }) +} + +func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvider) *fifoMetrics { + if metricsProvider == nil { + metricsProvider = globalFIFOMetricsProvider + } + metrics := &fifoMetrics{ + numberOfQueuedItem: noopMetric{}, + } + + if id.Reserved() { + metrics.numberOfQueuedItem = metricsProvider.NewQueuedItemMetric(id) + } + + return metrics +} + +func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric { + return noopMetric{} +} diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 96d282f2c..5ac89c945 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -56,6 +56,14 @@ type RealFIFOOptions struct { // while processing events to allow other goroutines to add items to the queue. // If UnlockWhileProcessing is true, AtomicEvents must be true as well. UnlockWhileProcessing bool + + // Identifier is used to identify this FIFO for metrics and logging purposes. + // Optional. If zero value, metrics will not be published and trace logs will not + // include Name or Resource fields. + Identifier InformerNameAndResource + + // MetricsProvider is used to create metrics for the FIFO. + MetricsProvider FIFOMetricsProvider } const ( @@ -113,6 +121,12 @@ type RealFIFO struct { // This may only be set if emitAtomicEvents is true. If unlockWhileProcessing is true, // Pop and PopBatch must be called from a single threaded consumer. unlockWhileProcessing bool + + // identifier is used to identify this FIFO for metrics and logging purposes. + identifier InformerNameAndResource + + // metrics holds all metrics for this FIFO. + metrics *fifoMetrics } // ReplacedAllInfo is the object associated with a Delta of type=ReplacedAll @@ -209,6 +223,7 @@ func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bo Object: obj, }) f.cond.Broadcast() + f.metrics.numberOfQueuedItem.Set(float64(len(f.items))) return nil } @@ -238,6 +253,7 @@ func (f *RealFIFO) addReplaceToItemsLocked(objs []interface{}, resourceVersion s Object: info, }) f.cond.Broadcast() + f.metrics.numberOfQueuedItem.Set(float64(len(f.items))) return nil } @@ -248,6 +264,7 @@ func (f *RealFIFO) addResyncToItemsLocked() error { Object: SyncAllInfo{}, }) f.cond.Broadcast() + f.metrics.numberOfQueuedItem.Set(float64(len(f.items))) return nil } @@ -340,12 +357,21 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { // https://github.com/kubernetes/kubernetes/issues/103789 if len(f.items) > 10 { id, _ := f.keyOf(item) - trace := utiltrace.New("RealFIFO Pop Process", - utiltrace.Field{Key: "ID", Value: id}, - utiltrace.Field{Key: "Depth", Value: len(f.items)}, - utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) + fields := []utiltrace.Field{ + {Key: "ID", Value: id}, + {Key: "Depth", Value: len(f.items)}, + {Key: "Reason", Value: "slow event handlers blocking the queue"}, + } + if name := f.identifier.Name(); len(name) > 0 { + fields = append(fields, utiltrace.Field{Key: "Name", Value: name}) + } + if gvr := f.identifier.GroupVersionResource(); !gvr.Empty() { + fields = append(fields, utiltrace.Field{Key: "Resource", Value: gvr}) + } + trace := utiltrace.New("RealFIFO Pop Process", fields...) defer trace.LogIfLong(100 * time.Millisecond) } + f.metrics.numberOfQueuedItem.Set(float64(len(f.items))) // Process the item, this may unlock the lock, and allow other goroutines to add items to the queue. err := f.whileProcessing_locked(func() error { @@ -459,13 +485,22 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc // https://github.com/kubernetes/kubernetes/issues/103789 if len(f.items) > 10 { id, _ := f.keyOf(deltas[0]) - trace := utiltrace.New("RealFIFO PopBatch Process", - utiltrace.Field{Key: "ID", Value: id}, - utiltrace.Field{Key: "Depth", Value: len(f.items)}, - utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}, - utiltrace.Field{Key: "BatchSize", Value: len(deltas)}) + fields := []utiltrace.Field{ + {Key: "ID", Value: id}, + {Key: "Depth", Value: len(f.items)}, + {Key: "Reason", Value: "slow event handlers blocking the queue"}, + {Key: "BatchSize", Value: len(deltas)}, + } + if name := f.identifier.Name(); len(name) > 0 { + fields = append(fields, utiltrace.Field{Key: "Name", Value: name}) + } + if gvr := f.identifier.GroupVersionResource(); !gvr.Empty() { + fields = append(fields, utiltrace.Field{Key: "Resource", Value: gvr}) + } + trace := utiltrace.New("RealFIFO PopBatch Process", fields...) defer trace.LogIfLong(min(100*time.Millisecond*time.Duration(len(deltas)), time.Second)) } + f.metrics.numberOfQueuedItem.Set(float64(len(f.items))) if len(deltas) == 1 { return f.whileProcessing_locked(func() error { @@ -711,6 +746,8 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { batchSize: defaultBatchSize, emitAtomicEvents: opts.AtomicEvents, unlockWhileProcessing: opts.UnlockWhileProcessing, + identifier: opts.Identifier, + metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider), } f.cond.L = &f.lock