Allow setting per-queue metrics providers

Kubernetes-commit: da71811514ac6e8fedd265831fed71f897b97781
This commit is contained in:
austin ce 2022-11-29 20:47:33 -05:00 committed by Kubernetes Publisher
parent 23d016c390
commit 98d0d1a9ba
5 changed files with 178 additions and 16 deletions

View File

@ -36,8 +36,8 @@ type DelayingInterface interface {
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedDelayingQueue instead.
func NewDelayingQueue() DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
func NewDelayingQueue(opts ...QueueOption) DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "", opts...)
}
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
@ -47,14 +47,14 @@ func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface
}
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
func NewNamedDelayingQueue(name string) DelayingInterface {
func NewNamedDelayingQueue(name string, opts ...QueueOption) DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}
// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
return newDelayingQueue(clock, NewNamed(name), name)
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string, opts ...QueueOption) DelayingInterface {
return newDelayingQueue(clock, NewNamed(name, opts...), name)
}
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {

View File

@ -276,3 +276,113 @@ func TestMetrics(t *testing.T) {
t.Errorf("expected %v, got %v", e, a)
}
}
func TestPerQueueMetrics(t *testing.T) {
// TODO(austince): refactor TestMetrics to make this test just another case
mp := testMetricsProvider{}
t0 := time.Unix(0, 0)
c := testingclock.NewFakeClock(t0)
q := newNamedQueueWithCustomClock(c, "test", time.Millisecond, WithMetricsProvider(&mp))
defer q.ShutDown()
for !c.HasWaiters() {
// Wait for the go routine to call NewTicker()
time.Sleep(time.Millisecond)
}
q.Add("foo")
if e, a := 1.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond)
// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 5e-05, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are
// de-duped.
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
q.Add(i)
if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond)
// Finish it up
q.Done(i)
if e, a := 2.5e-05, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}
if e, a := 2.5e-05, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's
// been set.
ch := make(chan struct{}, 1)
longestCh := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch
mp.longest.notifyCh = longestCh
c.Step(time.Millisecond)
<-ch
mp.unfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
<-longestCh
mp.longest.notifyCh = nil
if e, a := .001, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Finish that one up
q.Done(i)
if e, a := .001, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}

35
util/workqueue/options.go Normal file
View File

@ -0,0 +1,35 @@
package workqueue
type QueueConfig struct {
metricsProvider MetricsProvider
}
// QueueOption is an interface for applying queue configuration options.
type QueueOption interface {
apply(*QueueConfig)
}
type optionFunc func(*QueueConfig)
func (fn optionFunc) apply(config *QueueConfig) {
fn(config)
}
var _ QueueOption = optionFunc(nil)
// NewConfig creates a new QueueConfig and applies all the given options.
func NewConfig(opts ...QueueOption) *QueueConfig {
config := &QueueConfig{}
for _, o := range opts {
o.apply(config)
}
return config
}
// WithMetricsProvider allows specifying a metrics provider to use for the queue
// instead of the global provider.
func WithMetricsProvider(provider MetricsProvider) QueueOption {
return optionFunc(func(config *QueueConfig) {
config.metricsProvider = provider
})
}

View File

@ -34,16 +34,33 @@ type Interface interface {
}
// New constructs a new work queue (see the package comment).
func New() *Type {
return NewNamed("")
func New(opts ...QueueOption) *Type {
return NewNamed("", opts...)
}
func NewNamed(name string) *Type {
rc := clock.RealClock{}
func NewNamed(name string, opts ...QueueOption) *Type {
return newNamedQueueWithCustomClock(clock.RealClock{}, name, defaultUnfinishedWorkUpdatePeriod, opts...)
}
// newNamedQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes
// TODO(austince): should WithUpdatePeriod be a QueueOption? Could this func then be public?
func newNamedQueueWithCustomClock(clock clock.WithTicker, name string, updatePeriod time.Duration, opts ...QueueOption) *Type {
config := NewConfig(opts...)
var metricsFactory *queueMetricsFactory
if config.metricsProvider != nil {
metricsFactory = &queueMetricsFactory{
metricsProvider: config.metricsProvider,
}
} else {
metricsFactory = &globalMetricsFactory
}
return newQueue(
rc,
globalMetricsFactory.newQueueMetrics(name, rc),
defaultUnfinishedWorkUpdatePeriod,
clock,
metricsFactory.newQueueMetrics(name, clock),
updatePeriod,
)
}

View File

@ -36,16 +36,16 @@ type RateLimitingInterface interface {
// Remember to call Forget! If you don't, you may end up tracking failures forever.
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedRateLimitingQueue instead.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
func NewRateLimitingQueue(rateLimiter RateLimiter, opts ...QueueOption) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewDelayingQueue(),
DelayingInterface: NewDelayingQueue(opts...),
rateLimiter: rateLimiter,
}
}
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string, opts ...QueueOption) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewNamedDelayingQueue(name),
DelayingInterface: NewNamedDelayingQueue(name, opts...),
rateLimiter: rateLimiter,
}
}