From d56ed5816f9fe01c7df0afdc27760e393b6eb521 Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Sat, 14 Jun 2025 21:59:56 +0900 Subject: [PATCH] pop respects the context Kubernetes-commit: 1c33d98762511c10f89c40358a1935250b03b0c8 --- tools/cache/controller.go | 4 ++-- tools/cache/delta_fifo.go | 8 +++++++- tools/cache/fifo.go | 21 +++++++++++++++------ tools/cache/the_real_fifo.go | 8 +++++++- 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index f0a483b6..8bafcb67 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -208,9 +208,9 @@ func (c *controller) processLoop(ctx context.Context) { case <-ctx.Done(): return default: - _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(ctx, PopProcessFunc(c.config.Process)) if err != nil { - if err == ErrFIFOClosed { + if errors.Is(err, ErrFIFOClosed) || errors.Is(err, ErrCtxDone) { return } } diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 9d9e238c..bfa4deb1 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "fmt" "sync" @@ -495,11 +496,16 @@ func (f *DeltaFIFO) IsClosed() bool { // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. -func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { +func (f *DeltaFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { + // if the length of the queue is 0 and ctx is done, return here + if ctx.Err() != nil { + return nil, ErrCtxDone + } + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 5c2ca900..fde34edc 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "errors" "sync" @@ -30,6 +31,9 @@ type PopProcessFunc func(obj interface{}, isInInitialList bool) error // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") +// ErrCtxDone is used when context is done +var ErrCtxDone = errors.New("DeltaFIFO: context done") + // Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. @@ -38,8 +42,8 @@ var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") type Queue interface { ReflectorStore - // Pop blocks until there is at least one key to process or the - // Queue is closed. In the latter case Pop returns with an error. + // Pop blocks until there is at least one key to process, ctx is done, + // or the Queue is closed. In the latter case Pop returns with an error. // In the former case Pop atomically picks one key to process, // removes that (key, accumulator) association from the Store, and // processes the accumulator. Pop returns the accumulator that @@ -48,7 +52,7 @@ type Queue interface { // return that (key, accumulator) association to the Queue as part // of the atomic processing and (b) return the inner error from // Pop. - Pop(PopProcessFunc) (interface{}, error) + Pop(context.Context, PopProcessFunc) (interface{}, error) // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace @@ -66,9 +70,9 @@ type Queue interface { // // NOTE: This function is deprecated and may be removed in the future without // additional warning. -func Pop(queue Queue) interface{} { +func Pop(ctx context.Context, queue Queue) interface{} { var result interface{} - queue.Pop(func(obj interface{}, isInInitialList bool) error { + queue.Pop(ctx, func(obj interface{}, isInInitialList bool) error { result = obj return nil }) @@ -191,11 +195,16 @@ func (f *FIFO) IsClosed() bool { // so if you don't successfully process it, it should be added back with // AddIfNotPresent(). process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { +func (f *FIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { + // if the length of the queue is 0 and ctx is done, return here + if ctx.Err() != nil { + return nil, ErrCtxDone + } + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index ef322bea..6d8f76ec 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "sync" "time" @@ -192,11 +193,16 @@ func (f *RealFIFO) IsClosed() bool { // The item is removed from the queue (and the store) before it is processed. // process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { +func (f *RealFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for len(f.items) == 0 { + // if the length of the queue is 0 and ctx is done, return here + if ctx.Err() != nil { + return nil, ErrCtxDone + } + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop().