mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-25 14:41:53 +00:00
workqueue: make queue as configurable
The default queue implementation is mostly FIFO and it is not exchangeable unless we implement the whole `workqueue.Interface` which is less desirable as we have to duplicate a lot of code. There was one attempt done in [kubernetes/kubernetes#109349][1] which tried to implement a priority queue. That is really useful and [knative/pkg][2] implemented something called two-lane-queue. While two lane queue is great, but isn't perfect since a full slow queue can still slow down items in fast queue. This change proposes a swappable queue implementation while not adding extra maintenance effort in kubernetes community. We are happy to maintain our own queue implementation (similar to two-lane-queue) in downstream. [1]: https://github.com/kubernetes/kubernetes/pull/109349 [2]: https://github.com/knative/pkg/blob/main/controller/two_lane_queue.go Kubernetes-commit: 87b4279e07349b3c68f16f69a349a02bddd12f25
This commit is contained in:
parent
aa7909e7d7
commit
9c3db8681d
@ -41,7 +41,7 @@ func TestMetricShutdown(t *testing.T) {
|
|||||||
updateCalled: ch,
|
updateCalled: ch,
|
||||||
}
|
}
|
||||||
c := testingclock.NewFakeClock(time.Now())
|
c := testingclock.NewFakeClock(time.Now())
|
||||||
q := newQueue(c, m, time.Millisecond)
|
q := newQueue(c, DefaultQueue(), m, time.Millisecond)
|
||||||
for !c.HasWaiters() {
|
for !c.HasWaiters() {
|
||||||
// Wait for the go routine to call NewTicker()
|
// Wait for the go routine to call NewTicker()
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
|
@ -33,6 +33,48 @@ type Interface interface {
|
|||||||
ShuttingDown() bool
|
ShuttingDown() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Queue is the underlying storage for items. The functions below are always
|
||||||
|
// called from the same goroutine.
|
||||||
|
type Queue interface {
|
||||||
|
// Touch can be hooked when an existing item is added again. This may be
|
||||||
|
// useful if the implementation allows priority change for the given item.
|
||||||
|
Touch(item interface{})
|
||||||
|
// Push adds a new item.
|
||||||
|
Push(item interface{})
|
||||||
|
// Len tells the total number of items.
|
||||||
|
Len() int
|
||||||
|
// Pop retrieves an item.
|
||||||
|
Pop() (item interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultQueue is a slice based FIFO queue.
|
||||||
|
func DefaultQueue() Queue {
|
||||||
|
return new(queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
// queue is a slice which implements Queue.
|
||||||
|
type queue []interface{}
|
||||||
|
|
||||||
|
func (q *queue) Touch(item interface{}) {}
|
||||||
|
|
||||||
|
func (q *queue) Push(item interface{}) {
|
||||||
|
*q = append(*q, item)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *queue) Len() int {
|
||||||
|
return len(*q)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *queue) Pop() (item interface{}) {
|
||||||
|
item = (*q)[0]
|
||||||
|
|
||||||
|
// The underlying array still exists and reference this object, so the object will not be garbage collected.
|
||||||
|
(*q)[0] = nil
|
||||||
|
*q = (*q)[1:]
|
||||||
|
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
// QueueConfig specifies optional configurations to customize an Interface.
|
// QueueConfig specifies optional configurations to customize an Interface.
|
||||||
type QueueConfig struct {
|
type QueueConfig struct {
|
||||||
// Name for the queue. If unnamed, the metrics will not be registered.
|
// Name for the queue. If unnamed, the metrics will not be registered.
|
||||||
@ -44,6 +86,9 @@ type QueueConfig struct {
|
|||||||
|
|
||||||
// Clock ability to inject real or fake clock for testing purposes.
|
// Clock ability to inject real or fake clock for testing purposes.
|
||||||
Clock clock.WithTicker
|
Clock clock.WithTicker
|
||||||
|
|
||||||
|
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
|
||||||
|
Queue Queue
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a new work queue (see the package comment).
|
// New constructs a new work queue (see the package comment).
|
||||||
@ -83,16 +128,22 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
|
|||||||
config.Clock = clock.RealClock{}
|
config.Clock = clock.RealClock{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.Queue == nil {
|
||||||
|
config.Queue = DefaultQueue()
|
||||||
|
}
|
||||||
|
|
||||||
return newQueue(
|
return newQueue(
|
||||||
config.Clock,
|
config.Clock,
|
||||||
|
config.Queue,
|
||||||
metricsFactory.newQueueMetrics(config.Name, config.Clock),
|
metricsFactory.newQueueMetrics(config.Name, config.Clock),
|
||||||
updatePeriod,
|
updatePeriod,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
|
func newQueue(c clock.WithTicker, queue Queue, metrics queueMetrics, updatePeriod time.Duration) *Type {
|
||||||
t := &Type{
|
t := &Type{
|
||||||
clock: c,
|
clock: c,
|
||||||
|
queue: queue,
|
||||||
dirty: set{},
|
dirty: set{},
|
||||||
processing: set{},
|
processing: set{},
|
||||||
cond: sync.NewCond(&sync.Mutex{}),
|
cond: sync.NewCond(&sync.Mutex{}),
|
||||||
@ -116,7 +167,7 @@ type Type struct {
|
|||||||
// queue defines the order in which we will work on items. Every
|
// queue defines the order in which we will work on items. Every
|
||||||
// element of queue should be in the dirty set and not in the
|
// element of queue should be in the dirty set and not in the
|
||||||
// processing set.
|
// processing set.
|
||||||
queue []t
|
queue Queue
|
||||||
|
|
||||||
// dirty defines all of the items that need to be processed.
|
// dirty defines all of the items that need to be processed.
|
||||||
dirty set
|
dirty set
|
||||||
@ -167,6 +218,11 @@ func (q *Type) Add(item interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if q.dirty.has(item) {
|
if q.dirty.has(item) {
|
||||||
|
// the same item is added again before it is processed, call the Touch
|
||||||
|
// function if the queue cares about it (for e.g, reset its priority)
|
||||||
|
if !q.processing.has(item) {
|
||||||
|
q.queue.Touch(item)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,7 +233,7 @@ func (q *Type) Add(item interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
q.queue = append(q.queue, item)
|
q.queue.Push(item)
|
||||||
q.cond.Signal()
|
q.cond.Signal()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,7 +243,7 @@ func (q *Type) Add(item interface{}) {
|
|||||||
func (q *Type) Len() int {
|
func (q *Type) Len() int {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
return len(q.queue)
|
return q.queue.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get blocks until it can return an item to be processed. If shutdown = true,
|
// Get blocks until it can return an item to be processed. If shutdown = true,
|
||||||
@ -196,18 +252,15 @@ func (q *Type) Len() int {
|
|||||||
func (q *Type) Get() (item interface{}, shutdown bool) {
|
func (q *Type) Get() (item interface{}, shutdown bool) {
|
||||||
q.cond.L.Lock()
|
q.cond.L.Lock()
|
||||||
defer q.cond.L.Unlock()
|
defer q.cond.L.Unlock()
|
||||||
for len(q.queue) == 0 && !q.shuttingDown {
|
for q.queue.Len() == 0 && !q.shuttingDown {
|
||||||
q.cond.Wait()
|
q.cond.Wait()
|
||||||
}
|
}
|
||||||
if len(q.queue) == 0 {
|
if q.queue.Len() == 0 {
|
||||||
// We must be shutting down.
|
// We must be shutting down.
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
|
|
||||||
item = q.queue[0]
|
item = q.queue.Pop()
|
||||||
// The underlying array still exists and reference this object, so the object will not be garbage collected.
|
|
||||||
q.queue[0] = nil
|
|
||||||
q.queue = q.queue[1:]
|
|
||||||
|
|
||||||
q.metrics.get(item)
|
q.metrics.get(item)
|
||||||
|
|
||||||
@ -228,7 +281,7 @@ func (q *Type) Done(item interface{}) {
|
|||||||
|
|
||||||
q.processing.delete(item)
|
q.processing.delete(item)
|
||||||
if q.dirty.has(item) {
|
if q.dirty.has(item) {
|
||||||
q.queue = append(q.queue, item)
|
q.queue.Push(item)
|
||||||
q.cond.Signal()
|
q.cond.Signal()
|
||||||
} else if q.processing.len() == 0 {
|
} else if q.processing.len() == 0 {
|
||||||
q.cond.Signal()
|
q.cond.Signal()
|
||||||
|
@ -27,6 +27,23 @@ import (
|
|||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// traceQueue traces whether items are touched
|
||||||
|
type traceQueue struct {
|
||||||
|
workqueue.Queue
|
||||||
|
|
||||||
|
touched map[interface{}]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *traceQueue) Touch(item interface{}) {
|
||||||
|
t.Queue.Touch(item)
|
||||||
|
if t.touched == nil {
|
||||||
|
t.touched = make(map[interface{}]struct{})
|
||||||
|
}
|
||||||
|
t.touched[item] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ workqueue.Queue = &traceQueue{}
|
||||||
|
|
||||||
func TestBasic(t *testing.T) {
|
func TestBasic(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
queue *workqueue.Type
|
queue *workqueue.Type
|
||||||
@ -198,7 +215,11 @@ func TestReinsert(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCollapse(t *testing.T) {
|
func TestCollapse(t *testing.T) {
|
||||||
q := workqueue.New()
|
tq := &traceQueue{Queue: workqueue.DefaultQueue()}
|
||||||
|
q := workqueue.NewWithConfig(workqueue.QueueConfig{
|
||||||
|
Name: "",
|
||||||
|
Queue: tq,
|
||||||
|
})
|
||||||
// Add a new one twice
|
// Add a new one twice
|
||||||
q.Add("bar")
|
q.Add("bar")
|
||||||
q.Add("bar")
|
q.Add("bar")
|
||||||
@ -216,10 +237,18 @@ func TestCollapse(t *testing.T) {
|
|||||||
if a := q.Len(); a != 0 {
|
if a := q.Len(); a != 0 {
|
||||||
t.Errorf("Expected queue to be empty. Has %v items", a)
|
t.Errorf("Expected queue to be empty. Has %v items", a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := tq.touched["bar"]; !ok {
|
||||||
|
t.Errorf("Expected bar to be Touched")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCollapseWhileProcessing(t *testing.T) {
|
func TestCollapseWhileProcessing(t *testing.T) {
|
||||||
q := workqueue.New()
|
tq := &traceQueue{Queue: workqueue.DefaultQueue()}
|
||||||
|
q := workqueue.NewWithConfig(workqueue.QueueConfig{
|
||||||
|
Name: "",
|
||||||
|
Queue: tq,
|
||||||
|
})
|
||||||
q.Add("foo")
|
q.Add("foo")
|
||||||
|
|
||||||
// Start processing
|
// Start processing
|
||||||
@ -261,6 +290,10 @@ func TestCollapseWhileProcessing(t *testing.T) {
|
|||||||
if a := q.Len(); a != 0 {
|
if a := q.Len(); a != 0 {
|
||||||
t.Errorf("Expected queue to be empty. Has %v items", a)
|
t.Errorf("Expected queue to be empty. Has %v items", a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := tq.touched["foo"]; ok {
|
||||||
|
t.Errorf("Unexpected Touch")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
|
func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user