Merge pull request #127635 from antoninbas/use-generics-for-queue-metrics

Use generics for workqueue metrics
This commit is contained in:
Kubernetes Prow Robot
2024-10-08 10:20:22 +01:00
committed by GitHub
8 changed files with 99 additions and 52 deletions

View File

@@ -77,6 +77,17 @@ issues:
text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment
path-except: cmd/kubeadm path-except: cmd/kubeadm
# The unused linter that comes from staticcheck currently does not handle types which implement
# a generic interface. The linter incorrectly reports the implementations of unexported
# interface methods as unused. See https://github.com/dominikh/go-tools/issues/1294.
# Rather than exporting the interface methods, which makes the error go away but changes the
# semantics of the code, we ignore this error for affected files.
# This can be removed when the staticcheck implementation of this rule is fixed, which may
# depend on https://github.com/golang/go/issues/63982.
- linters:
- unused
path: staging/src/k8s.io/client-go/util/workqueue/metrics.go
linters: linters:
disable-all: false disable-all: false
enable: # please keep this alphabetized enable: # please keep this alphabetized

View File

@@ -77,6 +77,17 @@ issues:
text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment
path-except: cmd/kubeadm path-except: cmd/kubeadm
# The unused linter that comes from staticcheck currently does not handle types which implement
# a generic interface. The linter incorrectly reports the implementations of unexported
# interface methods as unused. See https://github.com/dominikh/go-tools/issues/1294.
# Rather than exporting the interface methods, which makes the error go away but changes the
# semantics of the code, we ignore this error for affected files.
# This can be removed when the staticcheck implementation of this rule is fixed, which may
# depend on https://github.com/golang/go/issues/63982.
- linters:
- unused
path: staging/src/k8s.io/client-go/util/workqueue/metrics.go
# The following issues were deemed "might be worth fixing, needs to be # The following issues were deemed "might be worth fixing, needs to be
# decided on a case-by-case basis". This was initially decided by a # decided on a case-by-case basis". This was initially decided by a
# majority of the developers who voted in # majority of the developers who voted in

View File

@@ -83,6 +83,17 @@ issues:
text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment
path-except: cmd/kubeadm path-except: cmd/kubeadm
# The unused linter that comes from staticcheck currently does not handle types which implement
# a generic interface. The linter incorrectly reports the implementations of unexported
# interface methods as unused. See https://github.com/dominikh/go-tools/issues/1294.
# Rather than exporting the interface methods, which makes the error go away but changes the
# semantics of the code, we ignore this error for affected files.
# This can be removed when the staticcheck implementation of this rule is fixed, which may
# depend on https://github.com/golang/go/issues/63982.
- linters:
- unused
path: staging/src/k8s.io/client-go/util/workqueue/metrics.go
# The following issues were deemed "might be worth fixing, needs to be # The following issues were deemed "might be worth fixing, needs to be
# decided on a case-by-case basis". This was initially decided by a # decided on a case-by-case basis". This was initially decided by a
# majority of the developers who voted in # majority of the developers who voted in

View File

@@ -86,6 +86,17 @@ issues:
text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment text: comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form|exported (.+) should have comment( \(or a comment on this block\))? or be unexported|package comment should be of the form "(.+)...|comment on exported (.+) should be of the form "(.+)...|should have a package comment
path-except: cmd/kubeadm path-except: cmd/kubeadm
# The unused linter that comes from staticcheck currently does not handle types which implement
# a generic interface. The linter incorrectly reports the implementations of unexported
# interface methods as unused. See https://github.com/dominikh/go-tools/issues/1294.
# Rather than exporting the interface methods, which makes the error go away but changes the
# semantics of the code, we ignore this error for affected files.
# This can be removed when the staticcheck implementation of this rule is fixed, which may
# depend on https://github.com/golang/go/issues/63982.
- linters:
- unused
path: staging/src/k8s.io/client-go/util/workqueue/metrics.go
{{- if not .Hints}} {{- if not .Hints}}
# The following issues were deemed "might be worth fixing, needs to be # The following issues were deemed "might be worth fixing, needs to be

View File

@@ -26,10 +26,10 @@ import (
// This file provides abstractions for setting the provider (e.g., prometheus) // This file provides abstractions for setting the provider (e.g., prometheus)
// of metrics. // of metrics.
type queueMetrics interface { type queueMetrics[T comparable] interface {
add(item t) add(item T)
get(item t) get(item T)
done(item t) done(item T)
updateUnfinishedWork() updateUnfinishedWork()
} }
@@ -70,7 +70,7 @@ func (noopMetric) Set(float64) {}
func (noopMetric) Observe(float64) {} func (noopMetric) Observe(float64) {}
// defaultQueueMetrics expects the caller to lock before setting any metrics. // defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics struct { type defaultQueueMetrics[T comparable] struct {
clock clock.Clock clock clock.Clock
// current depth of a workqueue // current depth of a workqueue
@@ -81,15 +81,15 @@ type defaultQueueMetrics struct {
latency HistogramMetric latency HistogramMetric
// how long processing an item from a workqueue takes // how long processing an item from a workqueue takes
workDuration HistogramMetric workDuration HistogramMetric
addTimes map[t]time.Time addTimes map[T]time.Time
processingStartTimes map[t]time.Time processingStartTimes map[T]time.Time
// how long have current threads been working? // how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric longestRunningProcessor SettableGaugeMetric
} }
func (m *defaultQueueMetrics) add(item t) { func (m *defaultQueueMetrics[T]) add(item T) {
if m == nil { if m == nil {
return return
} }
@@ -101,7 +101,7 @@ func (m *defaultQueueMetrics) add(item t) {
} }
} }
func (m *defaultQueueMetrics) get(item t) { func (m *defaultQueueMetrics[T]) get(item T) {
if m == nil { if m == nil {
return return
} }
@@ -114,7 +114,7 @@ func (m *defaultQueueMetrics) get(item t) {
} }
} }
func (m *defaultQueueMetrics) done(item t) { func (m *defaultQueueMetrics[T]) done(item T) {
if m == nil { if m == nil {
return return
} }
@@ -125,7 +125,7 @@ func (m *defaultQueueMetrics) done(item t) {
} }
} }
func (m *defaultQueueMetrics) updateUnfinishedWork() { func (m *defaultQueueMetrics[T]) updateUnfinishedWork() {
// Note that a summary metric would be better for this, but prometheus // Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics. // doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64 var total float64
@@ -141,15 +141,15 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
m.longestRunningProcessor.Set(oldest) m.longestRunningProcessor.Set(oldest)
} }
type noMetrics struct{} type noMetrics[T any] struct{}
func (noMetrics) add(item t) {} func (noMetrics[T]) add(item T) {}
func (noMetrics) get(item t) {} func (noMetrics[T]) get(item T) {}
func (noMetrics) done(item t) {} func (noMetrics[T]) done(item T) {}
func (noMetrics) updateUnfinishedWork() {} func (noMetrics[T]) updateUnfinishedWork() {}
// Gets the time since the specified start in seconds. // Gets the time since the specified start in seconds.
func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 { func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds() return m.clock.Since(start).Seconds()
} }
@@ -210,28 +210,15 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
var globalMetricsFactory = queueMetricsFactory{ var globalMetricsProvider MetricsProvider = noopMetricsProvider{}
metricsProvider: noopMetricsProvider{},
}
type queueMetricsFactory struct { var setGlobalMetricsProviderOnce sync.Once
metricsProvider MetricsProvider
onlyOnce sync.Once func newQueueMetrics[T comparable](mp MetricsProvider, name string, clock clock.Clock) queueMetrics[T] {
}
func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
f.onlyOnce.Do(func() {
f.metricsProvider = mp
})
}
func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
mp := f.metricsProvider
if len(name) == 0 || mp == (noopMetricsProvider{}) { if len(name) == 0 || mp == (noopMetricsProvider{}) {
return noMetrics{} return noMetrics[T]{}
} }
return &defaultQueueMetrics{ return &defaultQueueMetrics[T]{
clock: clock, clock: clock,
depth: mp.NewDepthMetric(name), depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name), adds: mp.NewAddsMetric(name),
@@ -239,8 +226,8 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
workDuration: mp.NewWorkDurationMetric(name), workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
addTimes: map[t]time.Time{}, addTimes: map[T]time.Time{},
processingStartTimes: map[t]time.Time{}, processingStartTimes: map[T]time.Time{},
} }
} }
@@ -251,7 +238,7 @@ func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
} }
if provider == nil { if provider == nil {
provider = globalMetricsFactory.metricsProvider provider = globalMetricsProvider
} }
return &defaultRetryMetrics{ return &defaultRetryMetrics{
@@ -262,5 +249,7 @@ func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
// SetProvider sets the metrics provider for all subsequently created work // SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect. // queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) { func SetProvider(metricsProvider MetricsProvider) {
globalMetricsFactory.setProvider(metricsProvider) setGlobalMetricsProviderOnce.Do(func() {
globalMetricsProvider = metricsProvider
})
} }

View File

@@ -30,9 +30,9 @@ type testMetrics struct {
updateCalled chan<- struct{} updateCalled chan<- struct{}
} }
func (m *testMetrics) add(item t) { m.added++ } func (m *testMetrics) add(item any) { m.added++ }
func (m *testMetrics) get(item t) { m.gotten++ } func (m *testMetrics) get(item any) { m.gotten++ }
func (m *testMetrics) done(item t) { m.finished++ } func (m *testMetrics) done(item any) { m.finished++ }
func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} } func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
func TestMetricShutdown(t *testing.T) { func TestMetricShutdown(t *testing.T) {

View File

@@ -138,13 +138,9 @@ func NewNamed(name string) *Type {
// newQueueWithConfig constructs a new named workqueue // newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes // with the ability to customize different properties for testing purposes
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] { func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
var metricsFactory *queueMetricsFactory metricsProvider := globalMetricsProvider
if config.MetricsProvider != nil { if config.MetricsProvider != nil {
metricsFactory = &queueMetricsFactory{ metricsProvider = config.MetricsProvider
metricsProvider: config.MetricsProvider,
}
} else {
metricsFactory = &globalMetricsFactory
} }
if config.Clock == nil { if config.Clock == nil {
@@ -158,12 +154,12 @@ func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod t
return newQueue( return newQueue(
config.Clock, config.Clock,
config.Queue, config.Queue,
metricsFactory.newQueueMetrics(config.Name, config.Clock), newQueueMetrics[T](metricsProvider, config.Name, config.Clock),
updatePeriod, updatePeriod,
) )
} }
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] { func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics[T], updatePeriod time.Duration) *Typed[T] {
t := &Typed[T]{ t := &Typed[T]{
clock: c, clock: c,
queue: queue, queue: queue,
@@ -176,7 +172,7 @@ func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMet
// Don't start the goroutine for a type of noMetrics so we don't consume // Don't start the goroutine for a type of noMetrics so we don't consume
// resources unnecessarily // resources unnecessarily
if _, ok := metrics.(noMetrics); !ok { if _, ok := metrics.(noMetrics[T]); !ok {
go t.updateUnfinishedWorkLoop() go t.updateUnfinishedWorkLoop()
} }
@@ -209,7 +205,7 @@ type Typed[t comparable] struct {
shuttingDown bool shuttingDown bool
drain bool drain bool
metrics queueMetrics metrics queueMetrics[t]
unfinishedWorkUpdatePeriod time.Duration unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker clock clock.WithTicker

View File

@@ -17,6 +17,7 @@ limitations under the License.
package workqueue_test package workqueue_test
import ( import (
"fmt"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -460,3 +461,20 @@ func mustGarbageCollect(t *testing.T, i interface{}) {
} }
}) })
} }
func BenchmarkQueue(b *testing.B) {
keys := make([]string, 100)
for idx := range keys {
keys[idx] = fmt.Sprintf("key-%d", idx)
}
for i := 0; i < b.N; i++ {
b.StopTimer()
q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[string]{})
b.StartTimer()
for j := 0; j < 100; j++ {
q.Add(keys[j])
key, _ := q.Get()
q.Done(key)
}
}
}