diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 2159014d606..06bc04f85ec 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -99,6 +99,10 @@ func New(c *Config) Controller { // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + go func() { + <-stopCh + c.config.Queue.Close() + }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, @@ -142,6 +146,9 @@ func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { + if err == FIFOClosedError { + return + } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 57969d8ea66..a71db60488d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -118,6 +118,12 @@ type DeltaFIFO struct { // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. knownObjects KeyListerGetter + + // Indication the queue is closed. + // Used to indicate a queue is closed so a control loop can exit when a queue is empty. + // Currently, not used to gate any of CRED operations. + closed bool + closedLock sync.Mutex } var ( @@ -132,6 +138,14 @@ var ( ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") ) +// Close the queue. +func (f *DeltaFIFO) Close() { + f.closedLock.Lock() + defer f.closedLock.Unlock() + f.closed = true + f.cond.Broadcast() +} + // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or // DeletedFinalStateUnknown objects. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { @@ -387,6 +401,16 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err return d, exists, nil } +// Checks if the queue is closed +func (f *DeltaFIFO) IsClosed() bool { + f.closedLock.Lock() + defer f.closedLock.Unlock() + if f.closed { + return true + } + return false +} + // Pop blocks until an item is added to the queue, and then returns it. If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it @@ -404,6 +428,13 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer f.lock.Unlock() for { for len(f.queue) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.IsClosed() { + return nil, FIFOClosedError + } + f.cond.Wait() } id := f.queue[0] diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo.go b/staging/src/k8s.io/client-go/tools/cache/fifo.go index 672facbc43b..3f6e2a9480a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "errors" "sync" "k8s.io/apimachinery/pkg/util/sets" @@ -33,6 +34,8 @@ type ErrRequeue struct { Err error } +var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue") + func (e ErrRequeue) Error() string { if e.Err == nil { return "the popped item should be requeued without returning an error" @@ -58,6 +61,9 @@ type Queue interface { // Return true if the first batch of items has been popped HasSynced() bool + + // Close queue + Close() } // Helper function for popping from Queue. @@ -100,12 +106,26 @@ type FIFO struct { // keyFunc is used to make the key used for queued item insertion and retrieval, and // should be deterministic. keyFunc KeyFunc + + // Indication the queue is closed. + // Used to indicate a queue is closed so a control loop can exit when a queue is empty. + // Currently, not used to gate any of CRED operations. + closed bool + closedLock sync.Mutex } var ( _ = Queue(&FIFO{}) // FIFO is a Queue ) +// Close the queue. +func (f *FIFO) Close() { + f.closedLock.Lock() + defer f.closedLock.Unlock() + f.closed = true + f.cond.Broadcast() +} + // Return true if an Add/Update/Delete/AddIfNotPresent are called first, // or an Update called first but the first batch of items inserted by Replace() has been popped func (f *FIFO) HasSynced() bool { @@ -222,6 +242,16 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { return item, exists, nil } +// Checks if the queue is closed +func (f *FIFO) IsClosed() bool { + f.closedLock.Lock() + defer f.closedLock.Unlock() + if f.closed { + return true + } + return false +} + // Pop waits until an item is ready and processes it. If multiple items are // ready, they are returned in the order in which they were added/updated. // The item is removed from the queue (and the store) before it is processed, @@ -233,6 +263,13 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { defer f.lock.Unlock() for { for len(f.queue) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.IsClosed() { + return nil, FIFOClosedError + } + f.cond.Wait() } id := f.queue[0]