Merge pull request #90825 from dopelsunce/master

Fix race condition between Pop and Close FIFO queue

Kubernetes-commit: 738dfcfff019a9868bd0ffd8df981e7e02630ebb
This commit is contained in:
Kubernetes Publisher 2020-05-19 23:40:32 -07:00
commit 7ab8430bef
4 changed files with 75 additions and 14 deletions

View File

@ -183,8 +183,7 @@ type DeltaFIFO struct {
// Indication the queue is closed. // Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations. // Currently, not used to gate any of CRED operations.
closed bool closed bool
closedLock sync.Mutex
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat). // DeltaType when Replace() is called (to preserve backwards compat).
@ -204,8 +203,8 @@ var (
// Close the queue. // Close the queue.
func (f *DeltaFIFO) Close() { func (f *DeltaFIFO) Close() {
f.closedLock.Lock() f.lock.Lock()
defer f.closedLock.Unlock() defer f.lock.Unlock()
f.closed = true f.closed = true
f.cond.Broadcast() f.cond.Broadcast()
} }
@ -447,8 +446,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
// IsClosed checks if the queue is closed // IsClosed checks if the queue is closed
func (f *DeltaFIFO) IsClosed() bool { func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock() f.lock.Lock()
defer f.closedLock.Unlock() defer f.lock.Unlock()
return f.closed return f.closed
} }
@ -472,7 +471,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted. // When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop(). // Which causes this loop to continue and return from the Pop().
if f.IsClosed() { if f.closed {
return nil, ErrFIFOClosed return nil, ErrFIFOClosed
} }

View File

@ -19,6 +19,7 @@ package cache
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"runtime"
"testing" "testing"
"time" "time"
) )
@ -645,3 +646,36 @@ func TestDeltaFIFO_HasSynced(t *testing.T) {
} }
} }
} }
// TestDeltaFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue
// should unblock and return after Close is called.
func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)}
}),
})
c := make(chan struct{})
const jobs = 10
for i := 0; i < jobs; i++ {
go func() {
f.Pop(func(obj interface{}) error {
return nil
})
c <- struct{}{}
}()
}
runtime.Gosched()
f.Close()
for i := 0; i < jobs; i++ {
select {
case <-c:
case <-time.After(500 * time.Millisecond):
t.Fatalf("timed out waiting for Pop to return after Close")
}
}
}

13
tools/cache/fifo.go vendored
View File

@ -128,8 +128,7 @@ type FIFO struct {
// Indication the queue is closed. // Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations. // Currently, not used to gate any of CRED operations.
closed bool closed bool
closedLock sync.Mutex
} }
var ( var (
@ -138,8 +137,8 @@ var (
// Close the queue. // Close the queue.
func (f *FIFO) Close() { func (f *FIFO) Close() {
f.closedLock.Lock() f.lock.Lock()
defer f.closedLock.Unlock() defer f.lock.Unlock()
f.closed = true f.closed = true
f.cond.Broadcast() f.cond.Broadcast()
} }
@ -262,8 +261,8 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
// IsClosed checks if the queue is closed // IsClosed checks if the queue is closed
func (f *FIFO) IsClosed() bool { func (f *FIFO) IsClosed() bool {
f.closedLock.Lock() f.lock.Lock()
defer f.closedLock.Unlock() defer f.lock.Unlock()
if f.closed { if f.closed {
return true return true
} }
@ -284,7 +283,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted. // When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop(). // Which causes this loop to continue and return from the Pop().
if f.IsClosed() { if f.closed {
return nil, ErrFIFOClosed return nil, ErrFIFOClosed
} }

View File

@ -19,6 +19,7 @@ package cache
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"runtime"
"testing" "testing"
"time" "time"
) )
@ -278,3 +279,31 @@ func TestFIFO_HasSynced(t *testing.T) {
} }
} }
} }
// TestFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue
// should unblock and return after Close is called.
func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
f := NewFIFO(testFifoObjectKeyFunc)
c := make(chan struct{})
const jobs = 10
for i := 0; i < jobs; i++ {
go func() {
f.Pop(func(obj interface{}) error {
return nil
})
c <- struct{}{}
}()
}
runtime.Gosched()
f.Close()
for i := 0; i < jobs; i++ {
select {
case <-c:
case <-time.After(500 * time.Millisecond):
t.Fatalf("timed out waiting for Pop to return after Close")
}
}
}