Merge pull request #70801 from Adirio/deltafifo-cleanup

DeltaFIFO cleanup
This commit is contained in:
k8s-ci-robot 2018-11-14 19:19:57 -08:00 committed by GitHub
commit 4fb368e5fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -320,17 +320,15 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas) newDeltas = dedupDeltas(newDeltas)
_, exists := f.items[id]
if len(newDeltas) > 0 { if len(newDeltas) > 0 {
if !exists { if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id) f.queue = append(f.queue, id)
} }
f.items[id] = newDeltas f.items[id] = newDeltas
f.cond.Broadcast() f.cond.Broadcast()
} else if exists { } else {
// We need to remove this from our map (extra items // We need to remove this from our map (extra items in the queue are
// in the queue are ignored if they are not in the // ignored if they are not in the map).
// map).
delete(f.items, id) delete(f.items, id)
} }
return nil return nil
@ -348,9 +346,6 @@ func (f *DeltaFIFO) List() []interface{} {
func (f *DeltaFIFO) listLocked() []interface{} { func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items)) list := make([]interface{}, 0, len(f.items))
for _, item := range f.items { for _, item := range f.items {
// Copy item's slice so operations on this slice
// won't interfere with the object we return.
item = copyDeltas(item)
list = append(list, item.Newest().Object) list = append(list, item.Newest().Object)
} }
return list return list
@ -398,10 +393,7 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
func (f *DeltaFIFO) IsClosed() bool { func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock() f.closedLock.Lock()
defer f.closedLock.Unlock() defer f.closedLock.Unlock()
if f.closed { return f.closed
return true
}
return false
} }
// Pop blocks until an item is added to the queue, and then returns it. If // Pop blocks until an item is added to the queue, and then returns it. If
@ -432,10 +424,10 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
} }
id := f.queue[0] id := f.queue[0]
f.queue = f.queue[1:] f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 { if f.initialPopulationCount > 0 {
f.initialPopulationCount-- f.initialPopulationCount--
} }
item, ok := f.items[id]
if !ok { if !ok {
// Item may have been deleted subsequently. // Item may have been deleted subsequently.
continue continue