mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 06:15:45 +00:00
Remove contemplation of invariant violations from delta_fifo.go
Some comments and code incorrectly contemplated violating the invariant that a keys is in `f.items` if and only if it is in `f.queue`. Also fixed up some comment wording.
This commit is contained in:
parent
6ac3ca4b17
commit
5efd727d11
@ -145,7 +145,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
|||||||
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
|
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
|
||||||
// interface{} to satisfy the Store/Queue interfaces, but they
|
// interface{} to satisfy the Store/Queue interfaces, but they
|
||||||
// will always return an object of type Deltas. List() returns
|
// will always return an object of type Deltas. List() returns
|
||||||
// the newest objects currently in the FIFO.
|
// the newest object from each accumulator in the FIFO.
|
||||||
//
|
//
|
||||||
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
|
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
|
||||||
// to list Store keys and to get objects by Store key. The objects in
|
// to list Store keys and to get objects by Store key. The objects in
|
||||||
@ -161,12 +161,13 @@ type DeltaFIFO struct {
|
|||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
cond sync.Cond
|
cond sync.Cond
|
||||||
|
|
||||||
// `items` maps keys to Deltas.
|
// `items` maps a key to a Deltas.
|
||||||
// `queue` maintains FIFO order of keys for consumption in Pop().
|
// Each such Deltas has at least one Delta.
|
||||||
// We maintain the property that keys in the `items` and `queue` are
|
|
||||||
// strictly 1:1 mapping, and that all Deltas in `items` should have
|
|
||||||
// at least one Delta.
|
|
||||||
items map[string]Deltas
|
items map[string]Deltas
|
||||||
|
|
||||||
|
// `queue` maintains FIFO order of keys for consumption in Pop().
|
||||||
|
// There are no duplicates in `queue`.
|
||||||
|
// A key is in `queue` if and only if it is in `items`.
|
||||||
queue []string
|
queue []string
|
||||||
|
|
||||||
// populated is true if the first batch of items inserted by Replace() has been populated
|
// populated is true if the first batch of items inserted by Replace() has been populated
|
||||||
@ -389,10 +390,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||||||
} else {
|
} else {
|
||||||
// This never happens, because dedupDeltas never returns an empty list
|
// This never happens, because dedupDeltas never returns an empty list
|
||||||
// when given a non-empty list (as it is here).
|
// when given a non-empty list (as it is here).
|
||||||
// But if somehow it ever does return an empty list, then
|
panic(fmt.Sprintf("Impossible dedupDeltas for id=%q: old items=%#+v, obj=%#+v", id, f.items[id], obj))
|
||||||
// We need to remove this from our map (extra items in the queue are
|
|
||||||
// ignored if they are not in the map).
|
|
||||||
delete(f.items, id)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -476,37 +474,35 @@ func (f *DeltaFIFO) IsClosed() bool {
|
|||||||
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
for {
|
for len(f.queue) == 0 {
|
||||||
for len(f.queue) == 0 {
|
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
||||||
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
||||||
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
// Which causes this loop to continue and return from the Pop().
|
||||||
// Which causes this loop to continue and return from the Pop().
|
if f.closed {
|
||||||
if f.closed {
|
return nil, ErrFIFOClosed
|
||||||
return nil, ErrFIFOClosed
|
}
|
||||||
}
|
|
||||||
|
|
||||||
f.cond.Wait()
|
f.cond.Wait()
|
||||||
}
|
|
||||||
id := f.queue[0]
|
|
||||||
f.queue = f.queue[1:]
|
|
||||||
if f.initialPopulationCount > 0 {
|
|
||||||
f.initialPopulationCount--
|
|
||||||
}
|
|
||||||
item, ok := f.items[id]
|
|
||||||
if !ok {
|
|
||||||
// Item may have been deleted subsequently.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
delete(f.items, id)
|
|
||||||
err := process(item)
|
|
||||||
if e, ok := err.(ErrRequeue); ok {
|
|
||||||
f.addIfNotPresent(id, item)
|
|
||||||
err = e.Err
|
|
||||||
}
|
|
||||||
// Don't need to copyDeltas here, because we're transferring
|
|
||||||
// ownership to the caller.
|
|
||||||
return item, err
|
|
||||||
}
|
}
|
||||||
|
id := f.queue[0]
|
||||||
|
f.queue = f.queue[1:]
|
||||||
|
if f.initialPopulationCount > 0 {
|
||||||
|
f.initialPopulationCount--
|
||||||
|
}
|
||||||
|
item, ok := f.items[id]
|
||||||
|
if !ok {
|
||||||
|
// This should never happen
|
||||||
|
panic(fmt.Sprintf("Inconceivable! %q was in f.queue but not f.items!", id))
|
||||||
|
}
|
||||||
|
delete(f.items, id)
|
||||||
|
err := process(item)
|
||||||
|
if e, ok := err.(ErrRequeue); ok {
|
||||||
|
f.addIfNotPresent(id, item)
|
||||||
|
err = e.Err
|
||||||
|
}
|
||||||
|
// Don't need to copyDeltas here, because we're transferring
|
||||||
|
// ownership to the caller.
|
||||||
|
return item, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replace atomically does two things: (1) it adds the given objects
|
// Replace atomically does two things: (1) it adds the given objects
|
||||||
|
Loading…
Reference in New Issue
Block a user