From a166e2578b8608e712251af674014c771c39ea66 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Thu, 2 Jul 2020 23:23:59 -0400 Subject: [PATCH] Repair instead of panic when data corruption detected in DeltaFIFO Kubernetes-commit: abcd0f82c5634c7a23b9390172b7c6ae907d84f7 --- tools/cache/delta_fifo.go | 101 ++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index b32806a1..37e17717 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -377,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err if err != nil { return KeyError{obj, err} } - - newDeltas := append(f.items[id], Delta{actionType, obj}) + oldDeltas := f.items[id] + newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { @@ -390,7 +390,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). - panic(fmt.Sprintf("Impossible dedupDeltas for id=%q: old items=%#+v, obj=%#+v", id, f.items[id], obj)) + // If somehow it happens anyway, deal with it but complain. + if oldDeltas == nil { + klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) + return nil + } + klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; removing from queue", id, oldDeltas, obj) + f.items[id] = newDeltas + f.filterQueueLocked() } return nil } @@ -457,7 +464,7 @@ func (f *DeltaFIFO) IsClosed() bool { return f.closed } -// Pop blocks until an item is added to the queue, and then returns it. If +// Pop blocks until the queue has some items, and then returns one. If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back @@ -474,35 +481,38 @@ func (f *DeltaFIFO) IsClosed() bool { func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() - for len(f.queue) == 0 { - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. - // When Close() is called, the f.closed is set and the condition is broadcasted. - // Which causes this loop to continue and return from the Pop(). - if f.closed { - return nil, ErrFIFOClosed - } + for { + for len(f.queue) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.closed { + return nil, ErrFIFOClosed + } - f.cond.Wait() + f.cond.Wait() + } + id := f.queue[0] + f.queue = f.queue[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + item, ok := f.items[id] + if !ok { + // This should never happen + klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) + continue + } + delete(f.items, id) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } + // Don't need to copyDeltas here, because we're transferring + // ownership to the caller. + return item, err } - id := f.queue[0] - f.queue = f.queue[1:] - if f.initialPopulationCount > 0 { - f.initialPopulationCount-- - } - item, ok := f.items[id] - if !ok { - // This should never happen - panic(fmt.Sprintf("Inconceivable! %q was in f.queue but not f.items!", id)) - } - delete(f.items, id) - err := process(item) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } - // Don't need to copyDeltas here, because we're transferring - // ownership to the caller. - return item, err } // Replace atomically does two things: (1) it adds the given objects @@ -646,6 +656,35 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error { return nil } +// A partial repair function. Removes keys from f.queue that +// correspond to nothing in f.items. Removes keys from both when the +// entry in f.items has zero length. Ensures each key appears at most +// once in queue. Must be called with the lock held. +func (f *DeltaFIFO) filterQueueLocked() { + newQueue := make([]string, 0, len(f.queue)) + newItems := make(map[string]Deltas, len(f.items)) + for _, key := range f.queue { + if _, exists := newItems[key]; exists { + klog.Errorf("Removing duplicate key %q", key) + continue + } + deltas, exists := f.items[key] + if !exists { + klog.Errorf("Removing key %q because it has no Deltas", key) + continue + } + if len(deltas) == 0 { + klog.Errorf("Removing key %q because it has zero-length Deltas", key) + continue + } + newQueue = append(newQueue, key) + newItems[key] = deltas + } + klog.Info("Finished repair of queue") + f.queue = newQueue + f.items = newItems +} + // A KeyListerGetter is anything that knows how to list its keys and look up by key. type KeyListerGetter interface { KeyLister