From 1aec7568e111f5855121e3afacacf431e5f95948 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 25 Sep 2024 14:35:03 -0700 Subject: [PATCH] Use generics for workqueue metrics The workqueue implementation was recently updated to be strongly typed, using Go generics. However the metrics implementation was not updated, and continued using interface{}. This translated to unnecessary memory allocations when invoking the queueMetrics interface methods to track queue operation. We can avoid these extra heap allocations by using generics for the metrics implementation as well. Signed-off-by: Antonin Bas --- .../client-go/util/workqueue/metrics.go | 67 ++++++++----------- .../client-go/util/workqueue/metrics_test.go | 6 +- .../k8s.io/client-go/util/workqueue/queue.go | 16 ++--- .../client-go/util/workqueue/queue_test.go | 18 +++++ 4 files changed, 55 insertions(+), 52 deletions(-) diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/util/workqueue/metrics.go index f012ccc5548..4400cb65e1e 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics.go @@ -26,10 +26,10 @@ import ( // This file provides abstractions for setting the provider (e.g., prometheus) // of metrics. -type queueMetrics interface { - add(item t) - get(item t) - done(item t) +type queueMetrics[T comparable] interface { + add(item T) + get(item T) + done(item T) updateUnfinishedWork() } @@ -70,7 +70,7 @@ func (noopMetric) Set(float64) {} func (noopMetric) Observe(float64) {} // defaultQueueMetrics expects the caller to lock before setting any metrics. -type defaultQueueMetrics struct { +type defaultQueueMetrics[T comparable] struct { clock clock.Clock // current depth of a workqueue @@ -81,15 +81,15 @@ type defaultQueueMetrics struct { latency HistogramMetric // how long processing an item from a workqueue takes workDuration HistogramMetric - addTimes map[t]time.Time - processingStartTimes map[t]time.Time + addTimes map[T]time.Time + processingStartTimes map[T]time.Time // how long have current threads been working? unfinishedWorkSeconds SettableGaugeMetric longestRunningProcessor SettableGaugeMetric } -func (m *defaultQueueMetrics) add(item t) { +func (m *defaultQueueMetrics[T]) add(item T) { if m == nil { return } @@ -101,7 +101,7 @@ func (m *defaultQueueMetrics) add(item t) { } } -func (m *defaultQueueMetrics) get(item t) { +func (m *defaultQueueMetrics[T]) get(item T) { if m == nil { return } @@ -114,7 +114,7 @@ func (m *defaultQueueMetrics) get(item t) { } } -func (m *defaultQueueMetrics) done(item t) { +func (m *defaultQueueMetrics[T]) done(item T) { if m == nil { return } @@ -125,7 +125,7 @@ func (m *defaultQueueMetrics) done(item t) { } } -func (m *defaultQueueMetrics) updateUnfinishedWork() { +func (m *defaultQueueMetrics[T]) updateUnfinishedWork() { // Note that a summary metric would be better for this, but prometheus // doesn't seem to have non-hacky ways to reset the summary metrics. var total float64 @@ -141,15 +141,15 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() { m.longestRunningProcessor.Set(oldest) } -type noMetrics struct{} +type noMetrics[T any] struct{} -func (noMetrics) add(item t) {} -func (noMetrics) get(item t) {} -func (noMetrics) done(item t) {} -func (noMetrics) updateUnfinishedWork() {} +func (noMetrics[T]) add(item T) {} +func (noMetrics[T]) get(item T) {} +func (noMetrics[T]) done(item T) {} +func (noMetrics[T]) updateUnfinishedWork() {} // Gets the time since the specified start in seconds. -func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 { +func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 { return m.clock.Since(start).Seconds() } @@ -210,28 +210,15 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { return noopMetric{} } -var globalMetricsFactory = queueMetricsFactory{ - metricsProvider: noopMetricsProvider{}, -} +var globalMetricsProvider MetricsProvider = noopMetricsProvider{} -type queueMetricsFactory struct { - metricsProvider MetricsProvider +var setGlobalMetricsProviderOnce sync.Once - onlyOnce sync.Once -} - -func (f *queueMetricsFactory) setProvider(mp MetricsProvider) { - f.onlyOnce.Do(func() { - f.metricsProvider = mp - }) -} - -func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { - mp := f.metricsProvider +func newQueueMetrics[T comparable](mp MetricsProvider, name string, clock clock.Clock) queueMetrics[T] { if len(name) == 0 || mp == (noopMetricsProvider{}) { - return noMetrics{} + return noMetrics[T]{} } - return &defaultQueueMetrics{ + return &defaultQueueMetrics[T]{ clock: clock, depth: mp.NewDepthMetric(name), adds: mp.NewAddsMetric(name), @@ -239,8 +226,8 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu workDuration: mp.NewWorkDurationMetric(name), unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), - addTimes: map[t]time.Time{}, - processingStartTimes: map[t]time.Time{}, + addTimes: map[T]time.Time{}, + processingStartTimes: map[T]time.Time{}, } } @@ -251,7 +238,7 @@ func newRetryMetrics(name string, provider MetricsProvider) retryMetrics { } if provider == nil { - provider = globalMetricsFactory.metricsProvider + provider = globalMetricsProvider } return &defaultRetryMetrics{ @@ -262,5 +249,7 @@ func newRetryMetrics(name string, provider MetricsProvider) retryMetrics { // SetProvider sets the metrics provider for all subsequently created work // queues. Only the first call has an effect. func SetProvider(metricsProvider MetricsProvider) { - globalMetricsFactory.setProvider(metricsProvider) + setGlobalMetricsProviderOnce.Do(func() { + globalMetricsProvider = metricsProvider + }) } diff --git a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go index 552ec7a8faf..945b762d59f 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/metrics_test.go @@ -30,9 +30,9 @@ type testMetrics struct { updateCalled chan<- struct{} } -func (m *testMetrics) add(item t) { m.added++ } -func (m *testMetrics) get(item t) { m.gotten++ } -func (m *testMetrics) done(item t) { m.finished++ } +func (m *testMetrics) add(item any) { m.added++ } +func (m *testMetrics) get(item any) { m.gotten++ } +func (m *testMetrics) done(item any) { m.finished++ } func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } func TestMetricShutdown(t *testing.T) { diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue.go b/staging/src/k8s.io/client-go/util/workqueue/queue.go index ff715482c11..226d9f9d073 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue.go @@ -138,13 +138,9 @@ func NewNamed(name string) *Type { // newQueueWithConfig constructs a new named workqueue // with the ability to customize different properties for testing purposes func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] { - var metricsFactory *queueMetricsFactory + metricsProvider := globalMetricsProvider if config.MetricsProvider != nil { - metricsFactory = &queueMetricsFactory{ - metricsProvider: config.MetricsProvider, - } - } else { - metricsFactory = &globalMetricsFactory + metricsProvider = config.MetricsProvider } if config.Clock == nil { @@ -158,12 +154,12 @@ func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod t return newQueue( config.Clock, config.Queue, - metricsFactory.newQueueMetrics(config.Name, config.Clock), + newQueueMetrics[T](metricsProvider, config.Name, config.Clock), updatePeriod, ) } -func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] { +func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics[T], updatePeriod time.Duration) *Typed[T] { t := &Typed[T]{ clock: c, queue: queue, @@ -176,7 +172,7 @@ func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMet // Don't start the goroutine for a type of noMetrics so we don't consume // resources unnecessarily - if _, ok := metrics.(noMetrics); !ok { + if _, ok := metrics.(noMetrics[T]); !ok { go t.updateUnfinishedWorkLoop() } @@ -209,7 +205,7 @@ type Typed[t comparable] struct { shuttingDown bool drain bool - metrics queueMetrics + metrics queueMetrics[t] unfinishedWorkUpdatePeriod time.Duration clock clock.WithTicker diff --git a/staging/src/k8s.io/client-go/util/workqueue/queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/queue_test.go index 6cec86ee6ee..7ede527fabe 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package workqueue_test import ( + "fmt" "runtime" "sync" "sync/atomic" @@ -460,3 +461,20 @@ func mustGarbageCollect(t *testing.T, i interface{}) { } }) } + +func BenchmarkQueue(b *testing.B) { + keys := make([]string, 100) + for idx := range keys { + keys[idx] = fmt.Sprintf("key-%d", idx) + } + for i := 0; i < b.N; i++ { + b.StopTimer() + q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{}) + b.StartTimer() + for j := 0; j < 100; j++ { + q.Add(keys[j]) + key, _ := q.Get() + q.Done(key) + } + } +}