From fc748aa158fcf9cf1a0df1a87e2e9781d768aac9 Mon Sep 17 00:00:00 2001 From: Keisuke Ishigami Date: Thu, 26 Jun 2025 23:49:34 +0900 Subject: [PATCH] Revert "pop respects the context" This reverts commit 1c33d98762511c10f89c40358a1935250b03b0c8. Kubernetes-commit: 74af3ac8ad1122528bb9971c3a2d282eff529beb --- tools/cache/controller.go | 4 ++-- tools/cache/delta_fifo.go | 8 +------- tools/cache/fifo.go | 21 ++++++--------------- tools/cache/the_real_fifo.go | 8 +------- 4 files changed, 10 insertions(+), 31 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 8bafcb67..f0a483b6 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -208,9 +208,9 @@ func (c *controller) processLoop(ctx context.Context) { case <-ctx.Done(): return default: - _, err := c.config.Queue.Pop(ctx, PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { - if errors.Is(err, ErrFIFOClosed) || errors.Is(err, ErrCtxDone) { + if err == ErrFIFOClosed { return } } diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index bfa4deb1..9d9e238c 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "errors" "fmt" "sync" @@ -496,16 +495,11 @@ func (f *DeltaFIFO) IsClosed() bool { // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. -func (f *DeltaFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { +func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { - // if the length of the queue is 0 and ctx is done, return here - if ctx.Err() != nil { - return nil, ErrCtxDone - } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index fde34edc..5c2ca900 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "errors" "sync" @@ -31,9 +30,6 @@ type PopProcessFunc func(obj interface{}, isInInitialList bool) error // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") -// ErrCtxDone is used when context is done -var ErrCtxDone = errors.New("DeltaFIFO: context done") - // Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. @@ -42,8 +38,8 @@ var ErrCtxDone = errors.New("DeltaFIFO: context done") type Queue interface { ReflectorStore - // Pop blocks until there is at least one key to process, ctx is done, - // or the Queue is closed. In the latter case Pop returns with an error. + // Pop blocks until there is at least one key to process or the + // Queue is closed. In the latter case Pop returns with an error. // In the former case Pop atomically picks one key to process, // removes that (key, accumulator) association from the Store, and // processes the accumulator. Pop returns the accumulator that @@ -52,7 +48,7 @@ type Queue interface { // return that (key, accumulator) association to the Queue as part // of the atomic processing and (b) return the inner error from // Pop. - Pop(context.Context, PopProcessFunc) (interface{}, error) + Pop(PopProcessFunc) (interface{}, error) // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace @@ -70,9 +66,9 @@ type Queue interface { // // NOTE: This function is deprecated and may be removed in the future without // additional warning. -func Pop(ctx context.Context, queue Queue) interface{} { +func Pop(queue Queue) interface{} { var result interface{} - queue.Pop(ctx, func(obj interface{}, isInInitialList bool) error { + queue.Pop(func(obj interface{}, isInInitialList bool) error { result = obj return nil }) @@ -195,16 +191,11 @@ func (f *FIFO) IsClosed() bool { // so if you don't successfully process it, it should be added back with // AddIfNotPresent(). process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *FIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { +func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { - // if the length of the queue is 0 and ctx is done, return here - if ctx.Err() != nil { - return nil, ErrCtxDone - } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go index 6d8f76ec..ef322bea 100644 --- a/tools/cache/the_real_fifo.go +++ b/tools/cache/the_real_fifo.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "fmt" "sync" "time" @@ -193,16 +192,11 @@ func (f *RealFIFO) IsClosed() bool { // The item is removed from the queue (and the store) before it is processed. // process function is called under lock, so it is safe // update data structures in it that need to be in sync with the queue. -func (f *RealFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) { +func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for len(f.items) == 0 { - // if the length of the queue is 0 and ctx is done, return here - if ctx.Err() != nil { - return nil, ErrCtxDone - } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop().