From 22aa998def2b0a684bc80b574268ac34e22a9dd9 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Wed, 12 May 2021 02:26:22 +0200 Subject: [PATCH] [queue] Implement `ShutDownWithDrain` allowing the queue to drain when shutting down Signed-off-by: Alexander Constantinescu Kubernetes-commit: 5b740f430e0a4892e9db3a1fea9f349a06267755 --- util/workqueue/queue.go | 70 +++++++- util/workqueue/queue_test.go | 312 ++++++++++++++++++++++++++--------- 2 files changed, 304 insertions(+), 78 deletions(-) diff --git a/util/workqueue/queue.go b/util/workqueue/queue.go index f7c14ddc..d563ae4e 100644 --- a/util/workqueue/queue.go +++ b/util/workqueue/queue.go @@ -29,6 +29,7 @@ type Interface interface { Get() (item interface{}, shutdown bool) Done(item interface{}) ShutDown() + ShutDownWithDrain() ShuttingDown() bool } @@ -86,6 +87,7 @@ type Type struct { cond *sync.Cond shuttingDown bool + drain bool metrics queueMetrics @@ -110,6 +112,10 @@ func (s set) delete(item t) { delete(s, item) } +func (s set) len() int { + return len(s) +} + // Add marks item as needing processing. func (q *Type) Add(item interface{}) { q.cond.L.Lock() @@ -178,13 +184,71 @@ func (q *Type) Done(item interface{}) { if q.dirty.has(item) { q.queue = append(q.queue, item) q.cond.Signal() + } else if q.processing.len() == 0 { + q.cond.Signal() } } -// ShutDown will cause q to ignore all new items added to it. As soon as the -// worker goroutines have drained the existing items in the queue, they will be -// instructed to exit. +// ShutDown will cause q to ignore all new items added to it and +// immediately instruct the worker goroutines to exit. func (q *Type) ShutDown() { + q.setDrain(false) + q.shutdown() +} + +// ShutDownWithDrain will cause q to ignore all new items added to it. As soon +// as the worker goroutines have "drained", i.e: finished processing and called +// Done on all existing items in the queue; they will be instructed to exit and +// ShutDownWithDrain will return. Hence: a strict requirement for using this is; +// your workers must ensure that Done is called on all items in the queue once +// the shut down has been initiated, if that is not the case: this will block +// indefinitely. It is, however, safe to call ShutDown after having called +// ShutDownWithDrain, as to force the queue shut down to terminate immediately +// without waiting for the drainage. +func (q *Type) ShutDownWithDrain() { + q.setDrain(true) + q.shutdown() + for q.isProcessing() && q.shouldDrain() { + q.waitForProcessing() + } +} + +// isProcessing indicates if there are still items on the work queue being +// processed. It's used to drain the work queue on an eventual shutdown. +func (q *Type) isProcessing() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.processing.len() != 0 +} + +// waitForProcessing waits for the worker goroutines to finish processing items +// and call Done on them. +func (q *Type) waitForProcessing() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + // Ensure that we do not wait on a queue which is already empty, as that + // could result in waiting for Done to be called on items in an empty queue + // which has already been shut down, which will result in waiting + // indefinitely. + if q.processing.len() == 0 { + return + } + q.cond.Wait() +} + +func (q *Type) setDrain(shouldDrain bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.drain = shouldDrain +} + +func (q *Type) shouldDrain() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.drain +} + +func (q *Type) shutdown() { q.cond.L.Lock() defer q.cond.L.Unlock() q.shuttingDown = true diff --git a/util/workqueue/queue_test.go b/util/workqueue/queue_test.go index 131f4a2a..2514746e 100644 --- a/util/workqueue/queue_test.go +++ b/util/workqueue/queue_test.go @@ -25,93 +25,127 @@ import ( ) func TestBasic(t *testing.T) { - // If something is seriously wrong this test will never complete. - q := workqueue.New() - - // Start producers - const producers = 50 - producerWG := sync.WaitGroup{} - producerWG.Add(producers) - for i := 0; i < producers; i++ { - go func(i int) { - defer producerWG.Done() - for j := 0; j < 50; j++ { - q.Add(i) - time.Sleep(time.Millisecond) - } - }(i) + tests := []struct { + queue *workqueue.Type + queueShutDown func(workqueue.Interface) + }{ + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDown, + }, + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDownWithDrain, + }, } + for _, test := range tests { + // If something is seriously wrong this test will never complete. - // Start consumers - const consumers = 10 - consumerWG := sync.WaitGroup{} - consumerWG.Add(consumers) - for i := 0; i < consumers; i++ { - go func(i int) { - defer consumerWG.Done() - for { - item, quit := q.Get() - if item == "added after shutdown!" { - t.Errorf("Got an item added after shutdown.") + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + for j := 0; j < 50; j++ { + test.queue.Add(i) + time.Sleep(time.Millisecond) } - if quit { - return + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + for { + item, quit := test.queue.Get() + if item == "added after shutdown!" { + t.Errorf("Got an item added after shutdown.") + } + if quit { + return + } + t.Logf("Worker %v: begin processing %v", i, item) + time.Sleep(3 * time.Millisecond) + t.Logf("Worker %v: done processing %v", i, item) + test.queue.Done(item) } - t.Logf("Worker %v: begin processing %v", i, item) - time.Sleep(3 * time.Millisecond) - t.Logf("Worker %v: done processing %v", i, item) - q.Done(item) - } - }(i) + }(i) + } + + producerWG.Wait() + test.queueShutDown(test.queue) + test.queue.Add("added after shutdown!") + consumerWG.Wait() + if test.queue.Len() != 0 { + t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len()) + } } - - producerWG.Wait() - q.ShutDown() - q.Add("added after shutdown!") - consumerWG.Wait() } func TestAddWhileProcessing(t *testing.T) { - q := workqueue.New() - - // Start producers - const producers = 50 - producerWG := sync.WaitGroup{} - producerWG.Add(producers) - for i := 0; i < producers; i++ { - go func(i int) { - defer producerWG.Done() - q.Add(i) - }(i) + tests := []struct { + queue *workqueue.Type + queueShutDown func(workqueue.Interface) + }{ + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDown, + }, + { + queue: workqueue.New(), + queueShutDown: workqueue.Interface.ShutDownWithDrain, + }, } + for _, test := range tests { - // Start consumers - const consumers = 10 - consumerWG := sync.WaitGroup{} - consumerWG.Add(consumers) - for i := 0; i < consumers; i++ { - go func(i int) { - defer consumerWG.Done() - // Every worker will re-add every item up to two times. - // This tests the dirty-while-processing case. - counters := map[interface{}]int{} - for { - item, quit := q.Get() - if quit { - return + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + test.queue.Add(i) + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + // Every worker will re-add every item up to two times. + // This tests the dirty-while-processing case. + counters := map[interface{}]int{} + for { + item, quit := test.queue.Get() + if quit { + return + } + counters[item]++ + if counters[item] < 2 { + test.queue.Add(item) + } + test.queue.Done(item) } - counters[item]++ - if counters[item] < 2 { - q.Add(item) - } - q.Done(item) - } - }(i) + }(i) + } + + producerWG.Wait() + test.queueShutDown(test.queue) + consumerWG.Wait() + if test.queue.Len() != 0 { + t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len()) + } } - - producerWG.Wait() - q.ShutDown() - consumerWG.Wait() } func TestLen(t *testing.T) { @@ -159,3 +193,131 @@ func TestReinsert(t *testing.T) { t.Errorf("Expected queue to be empty. Has %v items", a) } } + +func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) { + + q := workqueue.New() + + q.Add("foo") + q.Add("bar") + + firstItem, _ := q.Get() + secondItem, _ := q.Get() + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + q.ShutDownWithDrain() + }() + + // This is done as to simulate a sequence of events where ShutDownWithDrain + // is called before we start marking all items as done - thus simulating a + // drain where we wait for all items to finish processing. + shuttingDown := false + for !shuttingDown { + _, shuttingDown = q.Get() + } + + // Mark the first two items as done, as to finish up + q.Done(firstItem) + q.Done(secondItem) + + finishedWG.Wait() +} + +func TestNoQueueDrainageUsingShutDown(t *testing.T) { + + q := workqueue.New() + + q.Add("foo") + q.Add("bar") + + q.Get() + q.Get() + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + // Invoke ShutDown: suspending the execution immediately. + q.ShutDown() + }() + + // We can now do this and not have the test timeout because we didn't call + // Done on the first two items before arriving here. + finishedWG.Wait() +} + +func TestForceQueueShutdownUsingShutDown(t *testing.T) { + + q := workqueue.New() + + q.Add("foo") + q.Add("bar") + + q.Get() + q.Get() + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + q.ShutDownWithDrain() + }() + + // This is done as to simulate a sequence of events where ShutDownWithDrain + // is called before ShutDown + shuttingDown := false + for !shuttingDown { + _, shuttingDown = q.Get() + } + + // Use ShutDown to force the queue to shut down (simulating a caller + // which can invoke this function on a second SIGTERM/SIGINT) + q.ShutDown() + + // We can now do this and not have the test timeout because we didn't call + // done on any of the items before arriving here. + finishedWG.Wait() +} + +func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) { + q := workqueue.New() + + q.Add("foo") + gotten, _ := q.Get() + q.Add("foo") + + finishedWG := sync.WaitGroup{} + finishedWG.Add(1) + go func() { + defer finishedWG.Done() + q.ShutDownWithDrain() + }() + + // Ensure that ShutDownWithDrain has started and is blocked. + shuttingDown := false + for !shuttingDown { + _, shuttingDown = q.Get() + } + + // Finish "working". + q.Done(gotten) + + // `shuttingDown` becomes false because Done caused an item to go back into + // the queue. + again, shuttingDown := q.Get() + if shuttingDown { + t.Fatalf("should not have been done") + } + q.Done(again) + + // Now we are really done. + _, shuttingDown = q.Get() + if !shuttingDown { + t.Fatalf("should have been done") + } + + finishedWG.Wait() +}