From 5efd727d112206ef9a8ede93c5878b0d40707ae9 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Tue, 9 Jun 2020 18:57:28 -0400 Subject: [PATCH 1/3] Remove contemplation of invariant violations from delta_fifo.go Some comments and code incorrectly contemplated violating the invariant that a keys is in `f.items` if and only if it is in `f.queue`. Also fixed up some comment wording. --- .../client-go/tools/cache/delta_fifo.go | 74 +++++++++---------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 2774f4f2112..b32806a1968 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -145,7 +145,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { // DeltaFIFO's Pop(), Get(), and GetByKey() methods return // interface{} to satisfy the Store/Queue interfaces, but they // will always return an object of type Deltas. List() returns -// the newest objects currently in the FIFO. +// the newest object from each accumulator in the FIFO. // // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities // to list Store keys and to get objects by Store key. The objects in @@ -161,12 +161,13 @@ type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond - // `items` maps keys to Deltas. - // `queue` maintains FIFO order of keys for consumption in Pop(). - // We maintain the property that keys in the `items` and `queue` are - // strictly 1:1 mapping, and that all Deltas in `items` should have - // at least one Delta. + // `items` maps a key to a Deltas. + // Each such Deltas has at least one Delta. items map[string]Deltas + + // `queue` maintains FIFO order of keys for consumption in Pop(). + // There are no duplicates in `queue`. + // A key is in `queue` if and only if it is in `items`. queue []string // populated is true if the first batch of items inserted by Replace() has been populated @@ -389,10 +390,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). - // But if somehow it ever does return an empty list, then - // We need to remove this from our map (extra items in the queue are - // ignored if they are not in the map). - delete(f.items, id) + panic(fmt.Sprintf("Impossible dedupDeltas for id=%q: old items=%#+v, obj=%#+v", id, f.items[id], obj)) } return nil } @@ -476,37 +474,35 @@ func (f *DeltaFIFO) IsClosed() bool { func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() - for { - for len(f.queue) == 0 { - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. - // When Close() is called, the f.closed is set and the condition is broadcasted. - // Which causes this loop to continue and return from the Pop(). - if f.closed { - return nil, ErrFIFOClosed - } + for len(f.queue) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.closed { + return nil, ErrFIFOClosed + } - f.cond.Wait() - } - id := f.queue[0] - f.queue = f.queue[1:] - if f.initialPopulationCount > 0 { - f.initialPopulationCount-- - } - item, ok := f.items[id] - if !ok { - // Item may have been deleted subsequently. - continue - } - delete(f.items, id) - err := process(item) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } - // Don't need to copyDeltas here, because we're transferring - // ownership to the caller. - return item, err + f.cond.Wait() } + id := f.queue[0] + f.queue = f.queue[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + item, ok := f.items[id] + if !ok { + // This should never happen + panic(fmt.Sprintf("Inconceivable! %q was in f.queue but not f.items!", id)) + } + delete(f.items, id) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } + // Don't need to copyDeltas here, because we're transferring + // ownership to the caller. + return item, err } // Replace atomically does two things: (1) it adds the given objects From abcd0f82c5634c7a23b9390172b7c6ae907d84f7 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 2 Jul 2020 23:23:59 -0400 Subject: [PATCH 2/3] Repair instead of panic when data corruption detected in DeltaFIFO --- .../client-go/tools/cache/delta_fifo.go | 101 ++++++++++++------ 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index b32806a1968..37e17717b07 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -377,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err if err != nil { return KeyError{obj, err} } - - newDeltas := append(f.items[id], Delta{actionType, obj}) + oldDeltas := f.items[id] + newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { @@ -390,7 +390,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). - panic(fmt.Sprintf("Impossible dedupDeltas for id=%q: old items=%#+v, obj=%#+v", id, f.items[id], obj)) + // If somehow it happens anyway, deal with it but complain. + if oldDeltas == nil { + klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) + return nil + } + klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; removing from queue", id, oldDeltas, obj) + f.items[id] = newDeltas + f.filterQueueLocked() } return nil } @@ -457,7 +464,7 @@ func (f *DeltaFIFO) IsClosed() bool { return f.closed } -// Pop blocks until an item is added to the queue, and then returns it. If +// Pop blocks until the queue has some items, and then returns one. If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back @@ -474,35 +481,38 @@ func (f *DeltaFIFO) IsClosed() bool { func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() - for len(f.queue) == 0 { - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. - // When Close() is called, the f.closed is set and the condition is broadcasted. - // Which causes this loop to continue and return from the Pop(). - if f.closed { - return nil, ErrFIFOClosed - } + for { + for len(f.queue) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.closed { + return nil, ErrFIFOClosed + } - f.cond.Wait() + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + item, ok := f.items[id] + if !ok { + // This should never happen + klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) + continue + } + delete(f.items, id) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } + // Don't need to copyDeltas here, because we're transferring + // ownership to the caller. + return item, err } - id := f.queue[0] - f.queue = f.queue[1:] - if f.initialPopulationCount > 0 { - f.initialPopulationCount-- - } - item, ok := f.items[id] - if !ok { - // This should never happen - panic(fmt.Sprintf("Inconceivable! %q was in f.queue but not f.items!", id)) - } - delete(f.items, id) - err := process(item) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } - // Don't need to copyDeltas here, because we're transferring - // ownership to the caller. - return item, err } // Replace atomically does two things: (1) it adds the given objects @@ -646,6 +656,35 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error { return nil } +// A partial repair function. Removes keys from f.queue that +// correspond to nothing in f.items. Removes keys from both when the +// entry in f.items has zero length. Ensures each key appears at most +// once in queue. Must be called with the lock held. +func (f *DeltaFIFO) filterQueueLocked() { + newQueue := make([]string, 0, len(f.queue)) + newItems := make(map[string]Deltas, len(f.items)) + for _, key := range f.queue { + if _, exists := newItems[key]; exists { + klog.Errorf("Removing duplicate key %q", key) + continue + } + deltas, exists := f.items[key] + if !exists { + klog.Errorf("Removing key %q because it has no Deltas", key) + continue + } + if len(deltas) == 0 { + klog.Errorf("Removing key %q because it has zero-length Deltas", key) + continue + } + newQueue = append(newQueue, key) + newItems[key] = deltas + } + klog.Info("Finished repair of queue") + f.queue = newQueue + f.items = newItems +} + // A KeyListerGetter is anything that knows how to list its keys and look up by key. type KeyListerGetter interface { KeyLister From a39481a4f6cf33f9bf4555adcffa28077863e7a9 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Wed, 8 Jul 2020 01:15:51 -0400 Subject: [PATCH 3/3] Replaced repair with returning error, in delta_fifo.go When dedupDeltas does the impossible and the key is already queued, return an error rather than maintain the data structure invariants. --- .../client-go/tools/cache/delta_fifo.go | 33 ++----------------- 1 file changed, 2 insertions(+), 31 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 37e17717b07..148b478d584 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -395,9 +395,9 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) return nil } - klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; removing from queue", id, oldDeltas, obj) + klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas - f.filterQueueLocked() + return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } return nil } @@ -656,35 +656,6 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error { return nil } -// A partial repair function. Removes keys from f.queue that -// correspond to nothing in f.items. Removes keys from both when the -// entry in f.items has zero length. Ensures each key appears at most -// once in queue. Must be called with the lock held. -func (f *DeltaFIFO) filterQueueLocked() { - newQueue := make([]string, 0, len(f.queue)) - newItems := make(map[string]Deltas, len(f.items)) - for _, key := range f.queue { - if _, exists := newItems[key]; exists { - klog.Errorf("Removing duplicate key %q", key) - continue - } - deltas, exists := f.items[key] - if !exists { - klog.Errorf("Removing key %q because it has no Deltas", key) - continue - } - if len(deltas) == 0 { - klog.Errorf("Removing key %q because it has zero-length Deltas", key) - continue - } - newQueue = append(newQueue, key) - newItems[key] = deltas - } - klog.Info("Finished repair of queue") - f.queue = newQueue - f.items = newItems -} - // A KeyListerGetter is anything that knows how to list its keys and look up by key. type KeyListerGetter interface { KeyLister