diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 1026fb3f..9b861e8e 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -183,8 +183,7 @@ type DeltaFIFO struct { // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. - closed bool - closedLock sync.Mutex + closed bool // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). @@ -204,8 +203,8 @@ var ( // Close the queue. func (f *DeltaFIFO) Close() { - f.closedLock.Lock() - defer f.closedLock.Unlock() + f.lock.Lock() + defer f.lock.Unlock() f.closed = true f.cond.Broadcast() } @@ -447,8 +446,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err // IsClosed checks if the queue is closed func (f *DeltaFIFO) IsClosed() bool { - f.closedLock.Lock() - defer f.closedLock.Unlock() + f.lock.Lock() + defer f.lock.Unlock() return f.closed } @@ -472,7 +471,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). - if f.IsClosed() { + if f.closed { return nil, ErrFIFOClosed } diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 32cc96ac..31cbe880 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -19,6 +19,7 @@ package cache import ( "fmt" "reflect" + "runtime" "testing" "time" ) @@ -645,3 +646,36 @@ func TestDeltaFIFO_HasSynced(t *testing.T) { } } } + +// TestDeltaFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue +// should unblock and return after Close is called. +func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + }) + + c := make(chan struct{}) + const jobs = 10 + for i := 0; i < jobs; i++ { + go func() { + f.Pop(func(obj interface{}) error { + return nil + }) + c <- struct{}{} + }() + } + + runtime.Gosched() + f.Close() + + for i := 0; i < jobs; i++ { + select { + case <-c: + case <-time.After(500 * time.Millisecond): + t.Fatalf("timed out waiting for Pop to return after Close") + } + } +} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index b9e269fa..edb2c8ed 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -128,8 +128,7 @@ type FIFO struct { // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. - closed bool - closedLock sync.Mutex + closed bool } var ( @@ -138,8 +137,8 @@ var ( // Close the queue. func (f *FIFO) Close() { - f.closedLock.Lock() - defer f.closedLock.Unlock() + f.lock.Lock() + defer f.lock.Unlock() f.closed = true f.cond.Broadcast() } @@ -262,8 +261,8 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { // IsClosed checks if the queue is closed func (f *FIFO) IsClosed() bool { - f.closedLock.Lock() - defer f.closedLock.Unlock() + f.lock.Lock() + defer f.lock.Unlock() if f.closed { return true } @@ -284,7 +283,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). - if f.IsClosed() { + if f.closed { return nil, ErrFIFOClosed } diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index afd311d7..16b8502f 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -19,6 +19,7 @@ package cache import ( "fmt" "reflect" + "runtime" "testing" "time" ) @@ -278,3 +279,31 @@ func TestFIFO_HasSynced(t *testing.T) { } } } + +// TestFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue +// should unblock and return after Close is called. +func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) { + f := NewFIFO(testFifoObjectKeyFunc) + + c := make(chan struct{}) + const jobs = 10 + for i := 0; i < jobs; i++ { + go func() { + f.Pop(func(obj interface{}) error { + return nil + }) + c <- struct{}{} + }() + } + + runtime.Gosched() + f.Close() + + for i := 0; i < jobs; i++ { + select { + case <-c: + case <-time.After(500 * time.Millisecond): + t.Fatalf("timed out waiting for Pop to return after Close") + } + } +}