Ensure that processing does not block queue writers

Kubernetes-commit: 40c01b99a7bac05f6de438d67dd20a60333a0ac4
This commit is contained in:
Michael Aspinwall
2026-01-16 00:46:52 +00:00
committed by Kubernetes Publisher
parent ff70f47ee2
commit f5283a4822
4 changed files with 108 additions and 37 deletions

View File

@@ -76,6 +76,12 @@ const (
// GA: v1.35
InformerResourceVersion Feature = "InformerResourceVersion"
// owner: @michaelasp
// beta: v1.36
//
// Allow the FIFO to unlock while processing items to allow other goroutines to add items to the queue.
UnlockWhileProcessingFIFO Feature = "UnlockWhileProcessingFIFO"
// owner: @p0lyn0mial
// beta: v1.30
//
@@ -108,6 +114,9 @@ var defaultVersionedKubernetesFeatureGates = map[Feature]VersionedSpecs{
{Version: version.MustParse("1.30"), Default: false, PreRelease: Alpha},
{Version: version.MustParse("1.35"), Default: true, PreRelease: GA},
},
UnlockWhileProcessingFIFO: {
{Version: version.MustParse("1.36"), Default: true, PreRelease: Beta},
},
WatchListClient: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: Beta},
{Version: version.MustParse("1.35"), Default: true, PreRelease: Beta},

23
tools/cache/fifo.go vendored
View File

@@ -47,7 +47,9 @@ type Queue interface {
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
// Pop. It is expected that the caller of Pop will be a single
// threaded consumer since otherwise it is possible for multiple
// PopProcessFuncs to be running simultaneously.
Pop(PopProcessFunc) (interface{}, error)
// HasSynced returns true if the first batch of keys have all been
@@ -77,25 +79,12 @@ type QueueWithBatch interface {
// is called when a batch is ready to be processed. The PopProcessFunc
// is called when a singleton item is ready to be processed. The
// ProcessBatchFunc and PopProcessFunc must do the same processing to
// ensure consistent behavior.
// ensure consistent behavior. It is expected that the caller of PopBatch
// will be a single threaded consumer since otherwise it is possible for
// multiple ProcessBatchFuncs/PopProcessFuncs to be running simultaneously.
PopBatch(processBatch ProcessBatchFunc, processSingle PopProcessFunc) error
}
// Pop is helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
//
// NOTE: This function is deprecated and may be removed in the future without
// additional warning.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
return nil
})
return result
}
// FIFO is a Queue in which (a) each accumulator is simply the most
// recently provided object and (b) the collection of keys to process
// is a FIFO. The accumulators all start out empty, and deleting an

View File

@@ -67,6 +67,21 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
return item, exists, nil
}
// Pop is helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
//
// NOTE: This function is deprecated and may be removed in the future without
// additional warning.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
return nil
})
return result
}
func testFifoObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testFifoObject).name, nil
}

View File

@@ -50,6 +50,12 @@ type RealFIFOOptions struct {
// atomically for Replace and Resync operations.
// If AtomicEvents is true, KnownObjects must be nil.
AtomicEvents bool
// UnlockWhileProcessing is used to specify whether the RealFIFO can unlock
// the lock while processing events. If it is set, the lock can be unlocked
// while processing events to allow other goroutines to add items to the queue.
// If UnlockWhileProcessing is true, AtomicEvents must be true as well.
UnlockWhileProcessing bool
}
const (
@@ -102,6 +108,11 @@ type RealFIFO struct {
// * a single ReplacedAll event will be emitted instead of multiple Replace events
// * a single SyncAll event will be emitted instead of multiple Sync events
emitAtomicEvents bool
// unlockWhileProcessing defines whether we can unlock while processing events.
// This may only be set if emitAtomicEvents is true. If unlockWhileProcessing is true,
// Pop and PopBatch must be called from a single threaded consumer.
unlockWhileProcessing bool
}
// ReplacedAllInfo is the object associated with a Delta of type=ReplacedAll
@@ -285,10 +296,14 @@ func (f *RealFIFO) IsClosed() bool {
}
// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is processed.
// process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
// ready, they are returned in the order in which they were added/updated. The
// item is removed from the queue (and the store) before it is processed. The
// process function is only guaranteed to be called under lock if
// UnlockWhileProcessing is false. If the process function is updating data
// structures that need to be in sync with the queue, ensure
// UnlockWhileProcessing is false. It is expected that the caller of Pop will be
// a single threaded consumer since otherwise it is possible for multiple
// PopProcessFuncs to be running simultaneously.
func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -309,9 +324,14 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// The underlying array still exists and references this object, so the object will not be garbage collected unless we zero the reference.
f.items[0] = Delta{}
f.items = f.items[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// Decrement initialPopulationCount if needed.
// This is done in a defer so we only do this *after* processing is complete,
// so concurrent calls to hasSynced will not incorrectly return true while processing is still happening.
defer func() {
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
}()
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
@@ -327,11 +347,28 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer trace.LogIfLong(100 * time.Millisecond)
}
// we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value.
err := process(Deltas{item}, isInInitialList)
// Process the item, this may unlock the lock, and allow other goroutines to add items to the queue.
err := f.whileProcessing_locked(func() error {
// we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value.
return process(Deltas{item}, isInInitialList)
})
return Deltas{item}, err
}
// whileProcessing_locked calls the `process` function.
// The lock must be held before calling `whileProcessing_locked`, and is held when `whileProcessing_locked` returns.
// whileProcessing_locked releases the lock during the call to `process` if f.unlockWhileProcessing is true and the f.items queue is not too long.
func (f *RealFIFO) whileProcessing_locked(process func() error) error {
// Unlock before calling `process` so new items can be enqueued during processing.
// Only do this if the queue contains less than 2 full batches of items,
// to prevent the queue from growing unboundedly.
if f.unlockWhileProcessing && len(f.items) < f.batchSize*2 {
f.lock.Unlock()
defer f.lock.Lock()
}
return process()
}
// batchable stores the delta types that can be batched
var batchable = map[DeltaType]bool{
Sync: true,
@@ -343,6 +380,13 @@ var batchable = map[DeltaType]bool{
// PopBatch pops as many items as possible to be processed as a batch using processBatch,
// or pop a single item using processSingle if multiple items cannot be batched.
//
// The processBatch and processSingle functions are only guaranteed to be called
// under lock if UnlockWhileProcessing is false. If the process functions are
// updating data structures that need to be in sync with the queue, ensure
// UnlockWhileProcessing is false. It is expected that the caller of PopBatch
// will be a single threaded consumer, since otherwise it is possible for
// multiple ProcessBatchFunc or PopProcessFunc's to be running simultaneously.
func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProcessFunc) error {
f.lock.Lock()
defer f.lock.Unlock()
@@ -397,10 +441,16 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
unique.Insert(id)
moveDeltaToProcessList(i)
}
if f.initialPopulationCount > 0 {
f.initialPopulationCount -= len(deltas)
}
f.items = f.items[len(deltas):]
// Decrement initialPopulationCount if needed.
// This is done in a defer so we only do this *after* processing is complete,
// so concurrent calls to hasSynced will not incorrectly return true while processing is still happening.
defer func() {
if f.initialPopulationCount > 0 {
f.initialPopulationCount -= len(deltas)
}
}()
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue (with a max of 1 second for the whole batch)
@@ -418,9 +468,13 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
}
if len(deltas) == 1 {
return processSingle(Deltas{deltas[0]}, isInInitialList)
return f.whileProcessing_locked(func() error {
return processSingle(Deltas{deltas[0]}, isInInitialList)
})
}
return processBatch(deltas, isInInitialList)
return f.whileProcessing_locked(func() error {
return processBatch(deltas, isInInitialList)
})
}
// Replace
@@ -641,18 +695,22 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
panic("coding error: knownObjects must not be provided when AtomicEvents is true")
}
} else {
if opts.UnlockWhileProcessing {
panic("coding error: UnlockWhileProcessing must be false when AtomicEvents is false")
}
if opts.KnownObjects == nil {
panic("coding error: knownObjects must be provided when AtomicEvents is false")
}
}
f := &RealFIFO{
items: make([]Delta, 0, 10),
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
transformer: opts.Transformer,
batchSize: defaultBatchSize,
emitAtomicEvents: opts.AtomicEvents,
items: make([]Delta, 0, 10),
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
transformer: opts.Transformer,
batchSize: defaultBatchSize,
emitAtomicEvents: opts.AtomicEvents,
unlockWhileProcessing: opts.UnlockWhileProcessing,
}
f.cond.L = &f.lock