Revert "pop respects the context"

This reverts commit 1c33d98762511c10f89c40358a1935250b03b0c8.

Kubernetes-commit: 74af3ac8ad1122528bb9971c3a2d282eff529beb
This commit is contained in:
Keisuke Ishigami 2025-06-26 23:49:34 +09:00 committed by Kubernetes Publisher
parent bc19363821
commit fc748aa158
4 changed files with 10 additions and 31 deletions

View File

@ -208,9 +208,9 @@ func (c *controller) processLoop(ctx context.Context) {
case <-ctx.Done():
return
default:
_, err := c.config.Queue.Pop(ctx, PopProcessFunc(c.config.Process))
_, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if errors.Is(err, ErrFIFOClosed) || errors.Is(err, ErrCtxDone) {
if err == ErrFIFOClosed {
return
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package cache
import (
"context"
"errors"
"fmt"
"sync"
@ -496,16 +495,11 @@ func (f *DeltaFIFO) IsClosed() bool {
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) {
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// if the length of the queue is 0 and ctx is done, return here
if ctx.Err() != nil {
return nil, ErrCtxDone
}
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().

21
tools/cache/fifo.go vendored
View File

@ -17,7 +17,6 @@ limitations under the License.
package cache
import (
"context"
"errors"
"sync"
@ -31,9 +30,6 @@ type PopProcessFunc func(obj interface{}, isInInitialList bool) error
// ErrFIFOClosed used when FIFO is closed
var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
// ErrCtxDone is used when context is done
var ErrCtxDone = errors.New("DeltaFIFO: context done")
// Queue extends ReflectorStore with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
@ -42,8 +38,8 @@ var ErrCtxDone = errors.New("DeltaFIFO: context done")
type Queue interface {
ReflectorStore
// Pop blocks until there is at least one key to process, ctx is done,
// or the Queue is closed. In the latter case Pop returns with an error.
// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
// In the former case Pop atomically picks one key to process,
// removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
@ -52,7 +48,7 @@ type Queue interface {
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(context.Context, PopProcessFunc) (interface{}, error)
Pop(PopProcessFunc) (interface{}, error)
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
@ -70,9 +66,9 @@ type Queue interface {
//
// NOTE: This function is deprecated and may be removed in the future without
// additional warning.
func Pop(ctx context.Context, queue Queue) interface{} {
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(ctx, func(obj interface{}, isInInitialList bool) error {
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
return nil
})
@ -195,16 +191,11 @@ func (f *FIFO) IsClosed() bool {
// so if you don't successfully process it, it should be added back with
// AddIfNotPresent(). process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *FIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) {
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// if the length of the queue is 0 and ctx is done, return here
if ctx.Err() != nil {
return nil, ErrCtxDone
}
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().

View File

@ -17,7 +17,6 @@ limitations under the License.
package cache
import (
"context"
"fmt"
"sync"
"time"
@ -193,16 +192,11 @@ func (f *RealFIFO) IsClosed() bool {
// The item is removed from the queue (and the store) before it is processed.
// process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *RealFIFO) Pop(ctx context.Context, process PopProcessFunc) (interface{}, error) {
func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for len(f.items) == 0 {
// if the length of the queue is 0 and ctx is done, return here
if ctx.Err() != nil {
return nil, ErrCtxDone
}
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().