Merge pull request #23444 from deads2k/add-requeue-after-duration

Automatic merge from submit-queue

add a delayed queueing option to the workqueue

Adds delayed requeuing to the workqueue so that I requeue an item to be retried at some later time in my controller with a series of backoff rules.  It lets me have the best of the retryManager and the work queue de-duping.  Tracking failures and backoffs is on the caller

@smarterclayton @pweil-  this would help us move to using the informer everywhere and de-duping at that level.
This commit is contained in:
k8s-merge-robot
2016-04-14 05:02:24 -07:00
5 changed files with 512 additions and 4 deletions

View File

@@ -28,6 +28,7 @@ type Clock interface {
Since(time.Time) time.Duration
After(d time.Duration) <-chan time.Time
Sleep(d time.Duration)
Tick(d time.Duration) <-chan time.Time
}
var (
@@ -54,6 +55,10 @@ func (RealClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
func (RealClock) Tick(d time.Duration) <-chan time.Time {
return time.Tick(d)
}
func (RealClock) Sleep(d time.Duration) {
time.Sleep(d)
}
@@ -68,8 +73,10 @@ type FakeClock struct {
}
type fakeClockWaiter struct {
targetTime time.Time
destChan chan<- time.Time
targetTime time.Time
stepInterval time.Duration
skipIfBlocked bool
destChan chan<- time.Time
}
func NewFakeClock(t time.Time) *FakeClock {
@@ -105,7 +112,22 @@ func (f *FakeClock) After(d time.Duration) <-chan time.Time {
return ch
}
// Move clock by Duration, notify anyone that's called After
func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
f.lock.Lock()
defer f.lock.Unlock()
tickTime := f.time.Add(d)
ch := make(chan time.Time, 1) // hold one tick
f.waiters = append(f.waiters, fakeClockWaiter{
targetTime: tickTime,
stepInterval: d,
skipIfBlocked: true,
destChan: ch,
})
return ch
}
// Move clock by Duration, notify anyone that's called After or Tick
func (f *FakeClock) Step(d time.Duration) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -126,7 +148,23 @@ func (f *FakeClock) setTimeLocked(t time.Time) {
for i := range f.waiters {
w := &f.waiters[i]
if !w.targetTime.After(t) {
w.destChan <- t
if w.skipIfBlocked {
select {
case w.destChan <- t:
default:
}
} else {
w.destChan <- t
}
if w.stepInterval > 0 {
for !w.targetTime.After(t) {
w.targetTime = w.targetTime.Add(w.stepInterval)
}
newWaiters = append(newWaiters, *w)
}
} else {
newWaiters = append(newWaiters, f.waiters[i])
}
@@ -169,6 +207,12 @@ func (*IntervalClock) After(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement After")
}
// Unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement Tick")
}
func (*IntervalClock) Sleep(d time.Duration) {
panic("IntervalClock doesn't implement Sleep")
}

View File

@@ -104,3 +104,81 @@ func TestFakeAfter(t *testing.T) {
t.Errorf("unexpected non-channel read")
}
}
func TestFakeTick(t *testing.T) {
tc := NewFakeClock(time.Now())
if tc.HasWaiters() {
t.Errorf("unexpected waiter?")
}
oneSec := tc.Tick(time.Second)
if !tc.HasWaiters() {
t.Errorf("unexpected lack of waiter?")
}
oneOhOneSec := tc.Tick(time.Second + time.Millisecond)
twoSec := tc.Tick(2 * time.Second)
select {
case <-oneSec:
t.Errorf("unexpected channel read")
case <-oneOhOneSec:
t.Errorf("unexpected channel read")
case <-twoSec:
t.Errorf("unexpected channel read")
default:
}
tc.Step(999 * time.Millisecond) // t=.999
select {
case <-oneSec:
t.Errorf("unexpected channel read")
case <-oneOhOneSec:
t.Errorf("unexpected channel read")
case <-twoSec:
t.Errorf("unexpected channel read")
default:
}
tc.Step(time.Millisecond) // t=1.000
select {
case <-oneSec:
// Expected!
case <-oneOhOneSec:
t.Errorf("unexpected channel read")
case <-twoSec:
t.Errorf("unexpected channel read")
default:
t.Errorf("unexpected non-channel read")
}
tc.Step(time.Millisecond) // t=1.001
select {
case <-oneSec:
// should not double-trigger!
t.Errorf("unexpected channel read")
case <-oneOhOneSec:
// Expected!
case <-twoSec:
t.Errorf("unexpected channel read")
default:
t.Errorf("unexpected non-channel read")
}
tc.Step(time.Second) // t=2.001
tc.Step(time.Second) // t=3.001
tc.Step(time.Second) // t=4.001
tc.Step(time.Second) // t=5.001
// The one second ticker should not accumulate ticks
accumulatedTicks := 0
drained := false
for !drained {
select {
case <-oneSec:
accumulatedTicks++
default:
drained = true
}
}
if accumulatedTicks != 1 {
t.Errorf("unexpected number of accumulated ticks: %d", accumulatedTicks)
}
}

View File

@@ -0,0 +1,193 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sort"
"time"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewDelayingQueue() DelayingInterface {
return newDelayingQueue(util.RealClock{})
}
func newDelayingQueue(clock util.Clock) DelayingInterface {
ret := &delayingType{
Interface: New(),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
}
go ret.waitingLoop()
return ret
}
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
// clock tracks time for delayed firing
clock util.Clock
// stopCh lets us signal a shutdown to the waiting loop
stopCh chan struct{}
// heartbeat ensures we wait no more than maxWait before firing
heartbeat <-chan time.Time
// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
}
// waitFor holds the data to add and the time it should be added
type waitFor struct {
data t
readyAt time.Time
}
// ShutDown gives a way to shut off this queue
func (q *delayingType) ShutDown() {
q.Interface.ShutDown()
close(q.stopCh)
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
// expired item sitting for more than 10 seconds.
const maxWait = 10 * time.Second
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
for {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
return
}
now := q.clock.Now()
// Add ready entries
readyEntries := 0
for _, entry := range q.waitingForAdd {
if entry.readyAt.After(now) {
break
}
q.Add(entry.data)
readyEntries++
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if len(q.waitingForAdd) > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
}
select {
case <-q.stopCh:
return
case <-q.heartbeat:
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
func insert(entries []waitFor, entry waitFor) []waitFor {
insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})
// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry
return entries
}

View File

@@ -0,0 +1,177 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestSimpleQueue(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
first := "foo"
q.AddAfter(first, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Errorf("should have added")
}
item, _ := q.Get()
q.Done(item)
// step past the next heartbeat
fakeClock.Step(10 * time.Second)
err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
if q.Len() > 0 {
return false, fmt.Errorf("added to queue")
}
return false, nil
})
if err != wait.ErrWaitTimeout {
t.Errorf("expected timeout, got: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
}
func TestAddTwoFireEarly(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
first := "foo"
second := "bar"
third := "baz"
q.AddAfter(first, 1*time.Second)
q.AddAfter(second, 50*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(60 * time.Millisecond)
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ := q.Get()
if !reflect.DeepEqual(item, second) {
t.Errorf("expected %v, got %v", second, item)
}
q.AddAfter(third, 2*time.Second)
fakeClock.Step(1 * time.Second)
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
if !reflect.DeepEqual(item, first) {
t.Errorf("expected %v, got %v", first, item)
}
fakeClock.Step(2 * time.Second)
if err := waitForAdded(q, 1); err != nil {
t.Fatalf("unexpected err: %v", err)
}
item, _ = q.Get()
if !reflect.DeepEqual(item, third) {
t.Errorf("expected %v, got %v", third, item)
}
}
func TestCopyShifting(t *testing.T) {
fakeClock := util.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock)
first := "foo"
second := "bar"
third := "baz"
q.AddAfter(first, 1*time.Second)
q.AddAfter(second, 500*time.Millisecond)
q.AddAfter(third, 250*time.Millisecond)
if err := waitForWaitingQueueToFill(q); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if q.Len() != 0 {
t.Errorf("should not have added")
}
fakeClock.Step(2 * time.Second)
if err := waitForAdded(q, 3); err != nil {
t.Fatalf("unexpected err: %v", err)
}
actualFirst, _ := q.Get()
if !reflect.DeepEqual(actualFirst, third) {
t.Errorf("expected %v, got %v", third, actualFirst)
}
actualSecond, _ := q.Get()
if !reflect.DeepEqual(actualSecond, second) {
t.Errorf("expected %v, got %v", second, actualSecond)
}
actualThird, _ := q.Get()
if !reflect.DeepEqual(actualThird, first) {
t.Errorf("expected %v, got %v", first, actualThird)
}
}
func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
}
func waitForWaitingQueueToFill(q DelayingInterface) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if len(q.(*delayingType).waitingForAddCh) == 0 {
return true, nil
}
return false, nil
})
}

View File

@@ -20,6 +20,15 @@ import (
"sync"
)
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
// New constructs a new workqueue (see the package comment).
func New() *Type {
return &Type{
@@ -135,3 +144,10 @@ func (q *Type) ShutDown() {
q.shuttingDown = true
q.cond.Broadcast()
}
func (q *Type) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}