dedup workqueue requeuing

This commit is contained in:
deads2k 2016-06-28 15:41:49 -04:00
parent b4ee63f6a1
commit 56598898e1
2 changed files with 111 additions and 20 deletions

View File

@ -39,11 +39,12 @@ func NewDelayingQueue() DelayingInterface {
func newDelayingQueue(clock util.Clock) DelayingInterface {
ret := &delayingType{
Interface: New(),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
Interface: New(),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
}
go ret.waitingLoop()
@ -66,6 +67,8 @@ type delayingType struct {
// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
}
@ -118,6 +121,7 @@ func (q *delayingType) waitingLoop() {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return
}
@ -130,6 +134,7 @@ func (q *delayingType) waitingLoop() {
break
}
q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data)
readyEntries++
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]
@ -152,7 +157,7 @@ func (q *delayingType) waitingLoop() {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
@ -162,7 +167,7 @@ func (q *delayingType) waitingLoop() {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
} else {
q.Add(waitEntry.data)
}
@ -177,7 +182,20 @@ func (q *delayingType) waitingLoop() {
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
func insert(entries []waitFor, entry waitFor) []waitFor {
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}
// if the entry exists and is scheduled for later, go ahead and remove the entry
if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
}
}
insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
@ -189,5 +207,24 @@ func insert(entries []waitFor, entry waitFor) []waitFor {
// insert the record
entries[insertionIndex] = entry
knownEntries[entry.data] = entry.readyAt
return entries
}
// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})
// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
}
return index
}

View File

@ -43,7 +43,7 @@ func TestSimpleQueue(t *testing.T) {
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
@ -68,6 +68,65 @@ func TestSimpleQueue(t *testing.T) {
}
}
func TestDeduping(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
first := "foo"
q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
q.AddAfter(first, 70*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
// step past the first block, we should receive now
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)
// step past the second add
fakeClock.Step(20 * time.Millisecond)
if q.Len() != 0 {
t.Errorf("should not have added")
}
// test again, but this time the earlier should override
q.AddAfter(first, 50*time.Millisecond)
q.AddAfter(first, 30*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(40 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ = q.Get()
q.Done(item)
// step past the second add
fakeClock.Step(20 * time.Millisecond)
if q.Len() != 0 {
t.Errorf("should not have added")
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
}
func TestAddTwoFireEarly(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
@ -88,7 +147,7 @@ func TestAddTwoFireEarly(t *testing.T) {
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ := q.Get()
@ -99,7 +158,7 @@ func TestAddTwoFireEarly(t *testing.T) {
q.AddAfter(third, 2*time.Second)
fakeClock.Step(1 * time.Second)
if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
@ -108,7 +167,7 @@ func TestAddTwoFireEarly(t *testing.T) {
}
fakeClock.Step(2 * time.Second)
if err := waitForAdded(t, q, 1); err != nil {
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
@ -139,7 +198,7 @@ func TestCopyShifting(t *testing.T) {
fakeClock.Step(2 * time.Second)
if err := waitForAdded(t, q, 3); err != nil {
if err := waitForAdded(q, 3); err != nil {
t.Fatalf("unexpected err: %v", err)
}
actualFirst, _ := q.Get()
@ -156,19 +215,14 @@ func TestCopyShifting(t *testing.T) {
}
}
func waitForAdded(t *testing.T, q DelayingInterface, depth int) error {
err := wait.Poll(1*time.Millisecond, 20*time.Second, func() (done bool, err error) {
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
if err != nil {
t.Logf("failed: len=%v, everything=%#v", q.Len(), q)
}
return err
}
func waitForWaitingQueueToFill(q DelayingInterface) error {