diff --git a/util/workqueue/metrics_test.go b/util/workqueue/metrics_test.go index 3c3cc361..a98a728f 100644 --- a/util/workqueue/metrics_test.go +++ b/util/workqueue/metrics_test.go @@ -41,7 +41,7 @@ func TestMetricShutdown(t *testing.T) { updateCalled: ch, } c := testingclock.NewFakeClock(time.Now()) - q := newQueue(c, m, time.Millisecond) + q := newQueue(c, DefaultQueue(), m, time.Millisecond) for !c.HasWaiters() { // Wait for the go routine to call NewTicker() time.Sleep(time.Millisecond) diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index a363d1af..163d65c0 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -33,6 +33,48 @@ type Interface interface { ShuttingDown() bool } +// Queue is the underlying storage for items. The functions below are always +// called from the same goroutine. +type Queue interface { + // Touch can be hooked when an existing item is added again. This may be + // useful if the implementation allows priority change for the given item. + Touch(item interface{}) + // Push adds a new item. + Push(item interface{}) + // Len tells the total number of items. + Len() int + // Pop retrieves an item. + Pop() (item interface{}) +} + +// DefaultQueue is a slice based FIFO queue. +func DefaultQueue() Queue { + return new(queue) +} + +// queue is a slice which implements Queue. +type queue []interface{} + +func (q *queue) Touch(item interface{}) {} + +func (q *queue) Push(item interface{}) { + *q = append(*q, item) +} + +func (q *queue) Len() int { + return len(*q) +} + +func (q *queue) Pop() (item interface{}) { + item = (*q)[0] + + // The underlying array still exists and reference this object, so the object will not be garbage collected. + (*q)[0] = nil + *q = (*q)[1:] + + return item +} + // QueueConfig specifies optional configurations to customize an Interface. type QueueConfig struct { // Name for the queue. If unnamed, the metrics will not be registered. @@ -44,6 +86,9 @@ type QueueConfig struct { // Clock ability to inject real or fake clock for testing purposes. Clock clock.WithTicker + + // Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue. + Queue Queue } // New constructs a new work queue (see the package comment). @@ -83,16 +128,22 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type { config.Clock = clock.RealClock{} } + if config.Queue == nil { + config.Queue = DefaultQueue() + } + return newQueue( config.Clock, + config.Queue, metricsFactory.newQueueMetrics(config.Name, config.Clock), updatePeriod, ) } -func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type { +func newQueue(c clock.WithTicker, queue Queue, metrics queueMetrics, updatePeriod time.Duration) *Type { t := &Type{ clock: c, + queue: queue, dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), @@ -116,7 +167,7 @@ type Type struct { // queue defines the order in which we will work on items. Every // element of queue should be in the dirty set and not in the // processing set. - queue []t + queue Queue // dirty defines all of the items that need to be processed. dirty set @@ -167,6 +218,11 @@ func (q *Type) Add(item interface{}) { return } if q.dirty.has(item) { + // the same item is added again before it is processed, call the Touch + // function if the queue cares about it (for e.g, reset its priority) + if !q.processing.has(item) { + q.queue.Touch(item) + } return } @@ -177,7 +233,7 @@ func (q *Type) Add(item interface{}) { return } - q.queue = append(q.queue, item) + q.queue.Push(item) q.cond.Signal() } @@ -187,7 +243,7 @@ func (q *Type) Add(item interface{}) { func (q *Type) Len() int { q.cond.L.Lock() defer q.cond.L.Unlock() - return len(q.queue) + return q.queue.Len() } // Get blocks until it can return an item to be processed. If shutdown = true, @@ -196,18 +252,15 @@ func (q *Type) Len() int { func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() - for len(q.queue) == 0 && !q.shuttingDown { + for q.queue.Len() == 0 && !q.shuttingDown { q.cond.Wait() } - if len(q.queue) == 0 { + if q.queue.Len() == 0 { // We must be shutting down. return nil, true } - item = q.queue[0] - // The underlying array still exists and reference this object, so the object will not be garbage collected. - q.queue[0] = nil - q.queue = q.queue[1:] + item = q.queue.Pop() q.metrics.get(item) @@ -228,7 +281,7 @@ func (q *Type) Done(item interface{}) { q.processing.delete(item) if q.dirty.has(item) { - q.queue = append(q.queue, item) + q.queue.Push(item) q.cond.Signal() } else if q.processing.len() == 0 { q.cond.Signal() diff --git a/util/workqueue/queue_test.go b/util/workqueue/queue_test.go index e2a33973..1cf2cd2f 100644 --- a/util/workqueue/queue_test.go +++ b/util/workqueue/queue_test.go @@ -27,6 +27,23 @@ import ( "k8s.io/client-go/util/workqueue" ) +// traceQueue traces whether items are touched +type traceQueue struct { + workqueue.Queue + + touched map[interface{}]struct{} +} + +func (t *traceQueue) Touch(item interface{}) { + t.Queue.Touch(item) + if t.touched == nil { + t.touched = make(map[interface{}]struct{}) + } + t.touched[item] = struct{}{} +} + +var _ workqueue.Queue = &traceQueue{} + func TestBasic(t *testing.T) { tests := []struct { queue *workqueue.Type @@ -198,7 +215,11 @@ func TestReinsert(t *testing.T) { } func TestCollapse(t *testing.T) { - q := workqueue.New() + tq := &traceQueue{Queue: workqueue.DefaultQueue()} + q := workqueue.NewWithConfig(workqueue.QueueConfig{ + Name: "", + Queue: tq, + }) // Add a new one twice q.Add("bar") q.Add("bar") @@ -216,10 +237,18 @@ func TestCollapse(t *testing.T) { if a := q.Len(); a != 0 { t.Errorf("Expected queue to be empty. Has %v items", a) } + + if _, ok := tq.touched["bar"]; !ok { + t.Errorf("Expected bar to be Touched") + } } func TestCollapseWhileProcessing(t *testing.T) { - q := workqueue.New() + tq := &traceQueue{Queue: workqueue.DefaultQueue()} + q := workqueue.NewWithConfig(workqueue.QueueConfig{ + Name: "", + Queue: tq, + }) q.Add("foo") // Start processing @@ -261,6 +290,10 @@ func TestCollapseWhileProcessing(t *testing.T) { if a := q.Len(); a != 0 { t.Errorf("Expected queue to be empty. Has %v items", a) } + + if _, ok := tq.touched["foo"]; ok { + t.Errorf("Unexpected Touch") + } } func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {