Repair instead of panic when data corruption detected in DeltaFIFO

Kubernetes-commit: abcd0f82c5634c7a23b9390172b7c6ae907d84f7
This commit is contained in:
Mike Spreitzer 2020-07-02 23:23:59 -04:00 committed by Kubernetes Publisher
parent 826338c319
commit a166e2578b

View File

@ -377,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
@ -390,7 +390,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
panic(fmt.Sprintf("Impossible dedupDeltas for id=%q: old items=%#+v, obj=%#+v", id, f.items[id], obj))
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; removing from queue", id, oldDeltas, obj)
f.items[id] = newDeltas
f.filterQueueLocked()
}
return nil
}
@ -457,7 +464,7 @@ func (f *DeltaFIFO) IsClosed() bool {
return f.closed
}
// Pop blocks until an item is added to the queue, and then returns it. If
// Pop blocks until the queue has some items, and then returns one. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
@ -474,35 +481,38 @@ func (f *DeltaFIFO) IsClosed() bool {
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
panic(fmt.Sprintf("Inconceivable! %q was in f.queue but not f.items!", id))
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
// Replace atomically does two things: (1) it adds the given objects
@ -646,6 +656,35 @@ func (f *DeltaFIFO) syncKeyLocked(key string) error {
return nil
}
// A partial repair function. Removes keys from f.queue that
// correspond to nothing in f.items. Removes keys from both when the
// entry in f.items has zero length. Ensures each key appears at most
// once in queue. Must be called with the lock held.
func (f *DeltaFIFO) filterQueueLocked() {
newQueue := make([]string, 0, len(f.queue))
newItems := make(map[string]Deltas, len(f.items))
for _, key := range f.queue {
if _, exists := newItems[key]; exists {
klog.Errorf("Removing duplicate key %q", key)
continue
}
deltas, exists := f.items[key]
if !exists {
klog.Errorf("Removing key %q because it has no Deltas", key)
continue
}
if len(deltas) == 0 {
klog.Errorf("Removing key %q because it has zero-length Deltas", key)
continue
}
newQueue = append(newQueue, key)
newItems[key] = deltas
}
klog.Info("Finished repair of queue")
f.queue = newQueue
f.items = newItems
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister