Use generics for workqueue metrics

The workqueue implementation was recently updated to be strongly typed,
using Go generics. However the metrics implementation was not updated,
and continued using interface{}. This translated to unnecessary memory
allocations when invoking the queueMetrics interface methods to track
queue operation. We can avoid these extra heap allocations by using
generics for the metrics implementation as well.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>

Kubernetes-commit: 1aec7568e111f5855121e3afacacf431e5f95948
This commit is contained in:
Antonin Bas 2024-09-25 14:35:03 -07:00 committed by Kubernetes Publisher
parent ca4a13f6de
commit 5b31113588
4 changed files with 55 additions and 52 deletions

View File

@ -26,10 +26,10 @@ import (
// This file provides abstractions for setting the provider (e.g., prometheus)
// of metrics.
type queueMetrics interface {
add(item t)
get(item t)
done(item t)
type queueMetrics[T comparable] interface {
add(item T)
get(item T)
done(item T)
updateUnfinishedWork()
}
@ -70,7 +70,7 @@ func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {}
// defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics struct {
type defaultQueueMetrics[T comparable] struct {
clock clock.Clock
// current depth of a workqueue
@ -81,15 +81,15 @@ type defaultQueueMetrics struct {
latency HistogramMetric
// how long processing an item from a workqueue takes
workDuration HistogramMetric
addTimes map[t]time.Time
processingStartTimes map[t]time.Time
addTimes map[T]time.Time
processingStartTimes map[T]time.Time
// how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric
}
func (m *defaultQueueMetrics) add(item t) {
func (m *defaultQueueMetrics[T]) add(item T) {
if m == nil {
return
}
@ -101,7 +101,7 @@ func (m *defaultQueueMetrics) add(item t) {
}
}
func (m *defaultQueueMetrics) get(item t) {
func (m *defaultQueueMetrics[T]) get(item T) {
if m == nil {
return
}
@ -114,7 +114,7 @@ func (m *defaultQueueMetrics) get(item t) {
}
}
func (m *defaultQueueMetrics) done(item t) {
func (m *defaultQueueMetrics[T]) done(item T) {
if m == nil {
return
}
@ -125,7 +125,7 @@ func (m *defaultQueueMetrics) done(item t) {
}
}
func (m *defaultQueueMetrics) updateUnfinishedWork() {
func (m *defaultQueueMetrics[T]) updateUnfinishedWork() {
// Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64
@ -141,15 +141,15 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
m.longestRunningProcessor.Set(oldest)
}
type noMetrics struct{}
type noMetrics[T any] struct{}
func (noMetrics) add(item t) {}
func (noMetrics) get(item t) {}
func (noMetrics) done(item t) {}
func (noMetrics) updateUnfinishedWork() {}
func (noMetrics[T]) add(item T) {}
func (noMetrics[T]) get(item T) {}
func (noMetrics[T]) done(item T) {}
func (noMetrics[T]) updateUnfinishedWork() {}
// Gets the time since the specified start in seconds.
func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}
@ -210,28 +210,15 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{},
}
var globalMetricsProvider MetricsProvider = noopMetricsProvider{}
type queueMetricsFactory struct {
metricsProvider MetricsProvider
var setGlobalMetricsProviderOnce sync.Once
onlyOnce sync.Once
}
func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
f.onlyOnce.Do(func() {
f.metricsProvider = mp
})
}
func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
func newQueueMetrics[T comparable](mp MetricsProvider, name string, clock clock.Clock) queueMetrics[T] {
if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{}
return noMetrics[T]{}
}
return &defaultQueueMetrics{
return &defaultQueueMetrics[T]{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
@ -239,8 +226,8 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
addTimes: map[T]time.Time{},
processingStartTimes: map[T]time.Time{},
}
}
@ -251,7 +238,7 @@ func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
}
if provider == nil {
provider = globalMetricsFactory.metricsProvider
provider = globalMetricsProvider
}
return &defaultRetryMetrics{
@ -262,5 +249,7 @@ func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
// SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) {
globalMetricsFactory.setProvider(metricsProvider)
setGlobalMetricsProviderOnce.Do(func() {
globalMetricsProvider = metricsProvider
})
}

View File

@ -30,9 +30,9 @@ type testMetrics struct {
updateCalled chan<- struct{}
}
func (m *testMetrics) add(item t) { m.added++ }
func (m *testMetrics) get(item t) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ }
func (m *testMetrics) add(item any) { m.added++ }
func (m *testMetrics) get(item any) { m.gotten++ }
func (m *testMetrics) done(item any) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetricShutdown(t *testing.T) {

View File

@ -138,13 +138,9 @@ func NewNamed(name string) *Type {
// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
var metricsFactory *queueMetricsFactory
metricsProvider := globalMetricsProvider
if config.MetricsProvider != nil {
metricsFactory = &queueMetricsFactory{
metricsProvider: config.MetricsProvider,
}
} else {
metricsFactory = &globalMetricsFactory
metricsProvider = config.MetricsProvider
}
if config.Clock == nil {
@ -158,12 +154,12 @@ func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod t
return newQueue(
config.Clock,
config.Queue,
metricsFactory.newQueueMetrics(config.Name, config.Clock),
newQueueMetrics[T](metricsProvider, config.Name, config.Clock),
updatePeriod,
)
}
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] {
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics[T], updatePeriod time.Duration) *Typed[T] {
t := &Typed[T]{
clock: c,
queue: queue,
@ -176,7 +172,7 @@ func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMet
// Don't start the goroutine for a type of noMetrics so we don't consume
// resources unnecessarily
if _, ok := metrics.(noMetrics); !ok {
if _, ok := metrics.(noMetrics[T]); !ok {
go t.updateUnfinishedWorkLoop()
}
@ -209,7 +205,7 @@ type Typed[t comparable] struct {
shuttingDown bool
drain bool
metrics queueMetrics
metrics queueMetrics[t]
unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker

View File

@ -17,6 +17,7 @@ limitations under the License.
package workqueue_test
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
@ -460,3 +461,20 @@ func mustGarbageCollect(t *testing.T, i interface{}) {
}
})
}
func BenchmarkQueue(b *testing.B) {
keys := make([]string, 100)
for idx := range keys {
keys[idx] = fmt.Sprintf("key-%d", idx)
}
for i := 0; i < b.N; i++ {
b.StopTimer()
q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{})
b.StartTimer()
for j := 0; j < 100; j++ {
q.Add(keys[j])
key, _ := q.Get()
q.Done(key)
}
}
}