Merge pull request #91968 from MikeSpreitzer/fix91709

Remove contemplation of invariant violations from delta_fifo.go
This commit is contained in:
Kubernetes Prow Robot 2020-08-27 17:55:41 -07:00 committed by GitHub
commit 95fcb55b41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -145,7 +145,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas. List() returns
// the newest objects currently in the FIFO.
// the newest object from each accumulator in the FIFO.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
@ -161,12 +161,13 @@ type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
// `items` maps keys to Deltas.
// `queue` maintains FIFO order of keys for consumption in Pop().
// We maintain the property that keys in the `items` and `queue` are
// strictly 1:1 mapping, and that all Deltas in `items` should have
// at least one Delta.
// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
@ -376,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
@ -389,10 +390,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
@ -459,7 +464,7 @@ func (f *DeltaFIFO) IsClosed() bool {
return f.closed
}
// Pop blocks until an item is added to the queue, and then returns it. If
// Pop blocks until the queue has some items, and then returns one. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
@ -494,7 +499,8 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)