diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 5a71b8181a1..eaa6b5bb431 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -24,7 +24,7 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) -// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to +// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to // requeue items after failures without ending up in a hot-loop. type DelayingInterface interface { Interface @@ -68,6 +68,9 @@ type delayingType struct { stopCh chan struct{} // heartbeat ensures we wait no more than maxWait before firing + // + // TODO: replace with Ticker (and add to clock) so this can be cleaned up. + // clock.Tick will leak. heartbeat <-chan time.Time // waitingForAdd is an ordered slice of items to be added to the contained work queue @@ -115,7 +118,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { } } -// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. +// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an // expired item sitting for more than 10 seconds. const maxWait = 10 * time.Second @@ -192,6 +195,9 @@ func (q *delayingType) waitingLoop() { // inserts the given entry into the sorted entries list // same semantics as append()... the given slice may be modified, // and the returned value should be used +// +// TODO: This should probably be converted to use container/heap to improve +// running time for a large number of items. func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor { // if the entry is already in our retry list and the existing time is before the new one, just skip it existingTime, exists := knownEntries[entry.data] diff --git a/pkg/util/workqueue/parallelizer.go b/pkg/util/workqueue/parallelizer.go index a9305935bb2..b66cebe6985 100644 --- a/pkg/util/workqueue/parallelizer.go +++ b/pkg/util/workqueue/parallelizer.go @@ -33,6 +33,10 @@ func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { } close(toProcess) + if pieces < workers { + workers = pieces + } + wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index 9a2ecad382d..3e1a49fe202 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -154,7 +154,7 @@ func (q *Type) Done(item interface{}) { } } -// Shutdown will cause q to ignore all new items added to it. As soon as the +// ShutDown will cause q to ignore all new items added to it. As soon as the // worker goroutines have drained the existing items in the queue, they will be // instructed to exit. func (q *Type) ShutDown() { diff --git a/pkg/util/workqueue/queue_test.go b/pkg/util/workqueue/queue_test.go index 625f4545fd1..4b8553672e2 100644 --- a/pkg/util/workqueue/queue_test.go +++ b/pkg/util/workqueue/queue_test.go @@ -129,3 +129,33 @@ func TestLen(t *testing.T) { t.Errorf("Expected %v, got %v", e, a) } } + +func TestReinsert(t *testing.T) { + q := workqueue.New() + q.Add("foo") + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Add it back while processing + q.Add(i) + + // Finish it up + q.Done(i) + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Finish that one up + q.Done(i) + + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} diff --git a/pkg/util/workqueue/rate_limitting_queue.go b/pkg/util/workqueue/rate_limitting_queue.go index 9a2bfbb56a7..417ac001b84 100644 --- a/pkg/util/workqueue/rate_limitting_queue.go +++ b/pkg/util/workqueue/rate_limitting_queue.go @@ -16,10 +16,10 @@ limitations under the License. package workqueue -// RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to -// requeue items after failures without ending up in a hot-loop. +// RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { DelayingInterface + // AddRateLimited adds an item to the workqueue after the rate limiter says its ok AddRateLimited(item interface{}) @@ -27,6 +27,7 @@ type RateLimitingInterface interface { // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) + // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int }