Merge pull request #114242 from austince/feat/per-queue-metrics

Allow setting per-workqueue metrics providers
This commit is contained in:
Kubernetes Prow Robot 2023-03-14 16:34:26 -07:00 committed by GitHub
commit 98b1980a42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 172 additions and 37 deletions

View File

@ -33,38 +33,81 @@ type DelayingInterface interface {
AddAfter(item interface{}, duration time.Duration) AddAfter(item interface{}, duration time.Duration)
} }
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type DelayingQueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock optionally allows injecting a real or fake clock for testing purposes.
Clock clock.WithTicker
// Queue optionally allows injecting custom queue Interface instead of the default one.
Queue Interface
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability. // NewDelayingQueue constructs a new workqueue with delayed queuing ability.
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedDelayingQueue instead. // NewDelayingQueueWithConfig instead and specify a name.
func NewDelayingQueue() DelayingInterface { func NewDelayingQueue() DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}
// NewDelayingQueueWithConfig constructs a new workqueue with options to
// customize different properties.
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.Queue == nil {
config.Queue = NewWithConfig(QueueConfig{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
} }
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
// inject custom queue Interface instead of the default one // inject custom queue Interface instead of the default one
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, q, name) return NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: name,
Queue: q,
})
} }
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue(name string) DelayingInterface { func NewNamedDelayingQueue(name string) DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
} }
// NewDelayingQueueWithCustomClock constructs a new named workqueue // NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes // with ability to inject real or fake clock for testing purposes.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface { func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
return newDelayingQueue(clock, NewNamed(name), name) return NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: name,
Clock: clock,
})
} }
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType { func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
ret := &delayingType{ ret := &delayingType{
Interface: q, Interface: q,
clock: clock, clock: clock,
heartbeat: clock.NewTicker(maxWait), heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name), metrics: newRetryMetrics(name, provider),
} }
go ret.waitingLoop() go ret.waitingLoop()

View File

@ -29,7 +29,7 @@ import (
func TestSimpleQueue(t *testing.T) { func TestSimpleQueue(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "") q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
first := "foo" first := "foo"
@ -71,7 +71,7 @@ func TestSimpleQueue(t *testing.T) {
func TestDeduping(t *testing.T) { func TestDeduping(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "") q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
first := "foo" first := "foo"
@ -127,7 +127,7 @@ func TestDeduping(t *testing.T) {
func TestAddTwoFireEarly(t *testing.T) { func TestAddTwoFireEarly(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "") q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
first := "foo" first := "foo"
second := "bar" second := "bar"
@ -176,7 +176,7 @@ func TestAddTwoFireEarly(t *testing.T) {
func TestCopyShifting(t *testing.T) { func TestCopyShifting(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "") q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
first := "foo" first := "foo"
second := "bar" second := "bar"
@ -214,7 +214,7 @@ func TestCopyShifting(t *testing.T) {
func BenchmarkDelayingQueue_AddAfter(b *testing.B) { func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
q := NewDelayingQueueWithCustomClock(fakeClock, "") q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
// Add items // Add items
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {

View File

@ -244,13 +244,18 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
} }
} }
func newRetryMetrics(name string) retryMetrics { func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
var ret *defaultRetryMetrics var ret *defaultRetryMetrics
if len(name) == 0 { if len(name) == 0 {
return ret return ret
} }
if provider == nil {
provider = globalMetricsFactory.metricsProvider
}
return &defaultRetryMetrics{ return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), retries: provider.NewRetriesMetric(name),
} }
} }

View File

@ -171,9 +171,12 @@ func TestMetrics(t *testing.T) {
mp := testMetricsProvider{} mp := testMetricsProvider{}
t0 := time.Unix(0, 0) t0 := time.Unix(0, 0)
c := testingclock.NewFakeClock(t0) c := testingclock.NewFakeClock(t0)
mf := queueMetricsFactory{metricsProvider: &mp} config := QueueConfig{
m := mf.newQueueMetrics("test", c) Name: "test",
q := newQueue(c, m, time.Millisecond) Clock: c,
MetricsProvider: &mp,
}
q := newQueueWithConfig(config, time.Millisecond)
defer q.ShutDown() defer q.ShutDown()
for !c.HasWaiters() { for !c.HasWaiters() {
// Wait for the go routine to call NewTicker() // Wait for the go routine to call NewTicker()

View File

@ -33,17 +33,60 @@ type Interface interface {
ShuttingDown() bool ShuttingDown() bool
} }
// New constructs a new work queue (see the package comment). // QueueConfig specifies optional configurations to customize an Interface.
func New() *Type { type QueueConfig struct {
return NewNamed("") // Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock ability to inject real or fake clock for testing purposes.
Clock clock.WithTicker
} }
// New constructs a new work queue (see the package comment).
func New() *Type {
return NewWithConfig(QueueConfig{
Name: "",
})
}
// NewWithConfig constructs a new workqueue with ability to
// customize different properties.
func NewWithConfig(config QueueConfig) *Type {
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}
// NewNamed creates a new named queue.
// Deprecated: Use NewWithConfig instead.
func NewNamed(name string) *Type { func NewNamed(name string) *Type {
rc := clock.RealClock{} return NewWithConfig(QueueConfig{
Name: name,
})
}
// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
var metricsFactory *queueMetricsFactory
if config.MetricsProvider != nil {
metricsFactory = &queueMetricsFactory{
metricsProvider: config.MetricsProvider,
}
} else {
metricsFactory = &globalMetricsFactory
}
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
return newQueue( return newQueue(
rc, config.Clock,
globalMetricsFactory.newQueueMetrics(name, rc), metricsFactory.newQueueMetrics(config.Name, config.Clock),
defaultUnfinishedWorkUpdatePeriod, updatePeriod,
) )
} }

View File

@ -16,6 +16,8 @@ limitations under the License.
package workqueue package workqueue
import "k8s.io/utils/clock"
// RateLimitingInterface is an interface that rate limits items being added to the queue. // RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface { type RateLimitingInterface interface {
DelayingInterface DelayingInterface
@ -32,29 +34,68 @@ type RateLimitingInterface interface {
NumRequeues(item interface{}) int NumRequeues(item interface{}) int
} }
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
type RateLimitingQueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock optionally allows injecting a real or fake clock for testing purposes.
Clock clock.WithTicker
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
DelayingQueue DelayingInterface
}
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability // NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever. // Remember to call Forget! If you don't, you may end up tracking failures forever.
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use // NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedRateLimitingQueue instead. // NewRateLimitingQueueWithConfig instead and specify a name.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
}
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
// with options to customize different properties.
// Remember to call Forget! If you don't, you may end up tracking failures forever.
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.DelayingQueue == nil {
config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return &rateLimitingType{ return &rateLimitingType{
DelayingInterface: NewDelayingQueue(), DelayingInterface: config.DelayingQueue,
rateLimiter: rateLimiter, rateLimiter: rateLimiter,
} }
} }
// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability.
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
return &rateLimitingType{ return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
DelayingInterface: NewNamedDelayingQueue(name), Name: name,
rateLimiter: rateLimiter, })
}
} }
// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability
// with the option to inject a custom delaying queue instead of the default one.
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface { func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{ return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
DelayingInterface: di, DelayingQueue: di,
rateLimiter: rateLimiter, })
}
} }
// rateLimitingType wraps an Interface and provides rateLimited re-enquing // rateLimitingType wraps an Interface and provides rateLimited re-enquing

View File

@ -33,7 +33,7 @@ func TestRateLimitingQueue(t *testing.T) {
heartbeat: fakeClock.NewTicker(maxWait), heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""), metrics: newRetryMetrics("", nil),
} }
queue.DelayingInterface = delayingQueue queue.DelayingInterface = delayingQueue