[queue] Implement ShutDownWithDrain allowing the queue to drain when shutting down

Signed-off-by: Alexander Constantinescu <aconstan@redhat.com>

Kubernetes-commit: 5b740f430e0a4892e9db3a1fea9f349a06267755
This commit is contained in:
Alexander Constantinescu 2021-05-12 02:26:22 +02:00 committed by Kubernetes Publisher
parent 9026029b9a
commit 22aa998def
2 changed files with 304 additions and 78 deletions

View File

@ -29,6 +29,7 @@ type Interface interface {
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
@ -86,6 +87,7 @@ type Type struct {
cond *sync.Cond
shuttingDown bool
drain bool
metrics queueMetrics
@ -110,6 +112,10 @@ func (s set) delete(item t) {
delete(s, item)
}
func (s set) len() int {
return len(s)
}
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
@ -178,13 +184,71 @@ func (q *Type) Done(item interface{}) {
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
}
}
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
// ShutDown will cause q to ignore all new items added to it and
// immediately instruct the worker goroutines to exit.
func (q *Type) ShutDown() {
q.setDrain(false)
q.shutdown()
}
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
// as the worker goroutines have "drained", i.e: finished processing and called
// Done on all existing items in the queue; they will be instructed to exit and
// ShutDownWithDrain will return. Hence: a strict requirement for using this is;
// your workers must ensure that Done is called on all items in the queue once
// the shut down has been initiated, if that is not the case: this will block
// indefinitely. It is, however, safe to call ShutDown after having called
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
// without waiting for the drainage.
func (q *Type) ShutDownWithDrain() {
q.setDrain(true)
q.shutdown()
for q.isProcessing() && q.shouldDrain() {
q.waitForProcessing()
}
}
// isProcessing indicates if there are still items on the work queue being
// processed. It's used to drain the work queue on an eventual shutdown.
func (q *Type) isProcessing() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.processing.len() != 0
}
// waitForProcessing waits for the worker goroutines to finish processing items
// and call Done on them.
func (q *Type) waitForProcessing() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// Ensure that we do not wait on a queue which is already empty, as that
// could result in waiting for Done to be called on items in an empty queue
// which has already been shut down, which will result in waiting
// indefinitely.
if q.processing.len() == 0 {
return
}
q.cond.Wait()
}
func (q *Type) setDrain(shouldDrain bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.drain = shouldDrain
}
func (q *Type) shouldDrain() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.drain
}
func (q *Type) shutdown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true

View File

@ -25,93 +25,127 @@ import (
)
func TestBasic(t *testing.T) {
// If something is seriously wrong this test will never complete.
q := workqueue.New()
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
for j := 0; j < 50; j++ {
q.Add(i)
time.Sleep(time.Millisecond)
}
}(i)
tests := []struct {
queue *workqueue.Type
queueShutDown func(workqueue.Interface)
}{
{
queue: workqueue.New(),
queueShutDown: workqueue.Interface.ShutDown,
},
{
queue: workqueue.New(),
queueShutDown: workqueue.Interface.ShutDownWithDrain,
},
}
for _, test := range tests {
// If something is seriously wrong this test will never complete.
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
for {
item, quit := q.Get()
if item == "added after shutdown!" {
t.Errorf("Got an item added after shutdown.")
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
for j := 0; j < 50; j++ {
test.queue.Add(i)
time.Sleep(time.Millisecond)
}
if quit {
return
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
for {
item, quit := test.queue.Get()
if item == "added after shutdown!" {
t.Errorf("Got an item added after shutdown.")
}
if quit {
return
}
t.Logf("Worker %v: begin processing %v", i, item)
time.Sleep(3 * time.Millisecond)
t.Logf("Worker %v: done processing %v", i, item)
test.queue.Done(item)
}
t.Logf("Worker %v: begin processing %v", i, item)
time.Sleep(3 * time.Millisecond)
t.Logf("Worker %v: done processing %v", i, item)
q.Done(item)
}
}(i)
}(i)
}
producerWG.Wait()
test.queueShutDown(test.queue)
test.queue.Add("added after shutdown!")
consumerWG.Wait()
if test.queue.Len() != 0 {
t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
}
}
producerWG.Wait()
q.ShutDown()
q.Add("added after shutdown!")
consumerWG.Wait()
}
func TestAddWhileProcessing(t *testing.T) {
q := workqueue.New()
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
q.Add(i)
}(i)
tests := []struct {
queue *workqueue.Type
queueShutDown func(workqueue.Interface)
}{
{
queue: workqueue.New(),
queueShutDown: workqueue.Interface.ShutDown,
},
{
queue: workqueue.New(),
queueShutDown: workqueue.Interface.ShutDownWithDrain,
},
}
for _, test := range tests {
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
// Every worker will re-add every item up to two times.
// This tests the dirty-while-processing case.
counters := map[interface{}]int{}
for {
item, quit := q.Get()
if quit {
return
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
test.queue.Add(i)
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
// Every worker will re-add every item up to two times.
// This tests the dirty-while-processing case.
counters := map[interface{}]int{}
for {
item, quit := test.queue.Get()
if quit {
return
}
counters[item]++
if counters[item] < 2 {
test.queue.Add(item)
}
test.queue.Done(item)
}
counters[item]++
if counters[item] < 2 {
q.Add(item)
}
q.Done(item)
}
}(i)
}(i)
}
producerWG.Wait()
test.queueShutDown(test.queue)
consumerWG.Wait()
if test.queue.Len() != 0 {
t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
}
}
producerWG.Wait()
q.ShutDown()
consumerWG.Wait()
}
func TestLen(t *testing.T) {
@ -159,3 +193,131 @@ func TestReinsert(t *testing.T) {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
q := workqueue.New()
q.Add("foo")
q.Add("bar")
firstItem, _ := q.Get()
secondItem, _ := q.Get()
finishedWG := sync.WaitGroup{}
finishedWG.Add(1)
go func() {
defer finishedWG.Done()
q.ShutDownWithDrain()
}()
// This is done as to simulate a sequence of events where ShutDownWithDrain
// is called before we start marking all items as done - thus simulating a
// drain where we wait for all items to finish processing.
shuttingDown := false
for !shuttingDown {
_, shuttingDown = q.Get()
}
// Mark the first two items as done, as to finish up
q.Done(firstItem)
q.Done(secondItem)
finishedWG.Wait()
}
func TestNoQueueDrainageUsingShutDown(t *testing.T) {
q := workqueue.New()
q.Add("foo")
q.Add("bar")
q.Get()
q.Get()
finishedWG := sync.WaitGroup{}
finishedWG.Add(1)
go func() {
defer finishedWG.Done()
// Invoke ShutDown: suspending the execution immediately.
q.ShutDown()
}()
// We can now do this and not have the test timeout because we didn't call
// Done on the first two items before arriving here.
finishedWG.Wait()
}
func TestForceQueueShutdownUsingShutDown(t *testing.T) {
q := workqueue.New()
q.Add("foo")
q.Add("bar")
q.Get()
q.Get()
finishedWG := sync.WaitGroup{}
finishedWG.Add(1)
go func() {
defer finishedWG.Done()
q.ShutDownWithDrain()
}()
// This is done as to simulate a sequence of events where ShutDownWithDrain
// is called before ShutDown
shuttingDown := false
for !shuttingDown {
_, shuttingDown = q.Get()
}
// Use ShutDown to force the queue to shut down (simulating a caller
// which can invoke this function on a second SIGTERM/SIGINT)
q.ShutDown()
// We can now do this and not have the test timeout because we didn't call
// done on any of the items before arriving here.
finishedWG.Wait()
}
func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) {
q := workqueue.New()
q.Add("foo")
gotten, _ := q.Get()
q.Add("foo")
finishedWG := sync.WaitGroup{}
finishedWG.Add(1)
go func() {
defer finishedWG.Done()
q.ShutDownWithDrain()
}()
// Ensure that ShutDownWithDrain has started and is blocked.
shuttingDown := false
for !shuttingDown {
_, shuttingDown = q.Get()
}
// Finish "working".
q.Done(gotten)
// `shuttingDown` becomes false because Done caused an item to go back into
// the queue.
again, shuttingDown := q.Get()
if shuttingDown {
t.Fatalf("should not have been done")
}
q.Done(again)
// Now we are really done.
_, shuttingDown = q.Get()
if !shuttingDown {
t.Fatalf("should have been done")
}
finishedWG.Wait()
}