From 991963c925ba72b21b878cf6df17e29b3d22a8af Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Tue, 20 Dec 2016 17:23:28 -0800 Subject: [PATCH] run hack/update-staging-client-go.sh after the copy.sh changes --- .../util/workqueue/default_rate_limiters.go | 211 +++++++++++++++ .../workqueue/default_rate_limiters_test.go | 184 +++++++++++++ .../pkg/util/workqueue/delaying_queue.go | 246 ++++++++++++++++++ .../pkg/util/workqueue/delaying_queue_test.go | 236 +++++++++++++++++ .../client-go/pkg/util/workqueue/doc.go | 26 ++ .../client-go/pkg/util/workqueue/metrics.go | 195 ++++++++++++++ .../pkg/util/workqueue/parallelizer.go | 52 ++++ .../client-go/pkg/util/workqueue/queue.go | 172 ++++++++++++ .../pkg/util/workqueue/queue_test.go | 161 ++++++++++++ .../util/workqueue/rate_limitting_queue.go | 69 +++++ .../workqueue/rate_limitting_queue_test.go | 75 ++++++ .../pkg/util/workqueue/timed_queue.go | 52 ++++ .../pkg/util/workqueue/timed_queue_test.go | 38 +++ 13 files changed, 1717 insertions(+) create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters_test.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue_test.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/doc.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/metrics.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/parallelizer.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/queue.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/queue_test.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue_test.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue.go create mode 100644 staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue_test.go diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters.go new file mode 100644 index 00000000000..35caed4fa41 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters.go @@ -0,0 +1,211 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "math" + "sync" + "time" + + "github.com/juju/ratelimit" +) + +type RateLimiter interface { + // When gets an item and gets to decide how long that item should wait + When(item interface{}) time.Duration + // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing + // or for success, we'll stop tracking it + Forget(item interface{}) + // NumRequeues returns back how many failures the item has had + NumRequeues(item interface{}) int +} + +// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has +// both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential +func DefaultControllerRateLimiter() RateLimiter { + return NewMaxOfRateLimiter( + NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, + ) +} + +// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API +type BucketRateLimiter struct { + *ratelimit.Bucket +} + +var _ RateLimiter = &BucketRateLimiter{} + +func (r *BucketRateLimiter) When(item interface{}) time.Duration { + return r.Bucket.Take(1) +} + +func (r *BucketRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +func (r *BucketRateLimiter) Forget(item interface{}) { +} + +// ItemExponentialFailureRateLimiter does a simple baseDelay*10^ limit +// dealing with max failures and expiration are up to the caller +type ItemExponentialFailureRateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + + baseDelay time.Duration + maxDelay time.Duration +} + +var _ RateLimiter = &ItemExponentialFailureRateLimiter{} + +func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter { + return &ItemExponentialFailureRateLimiter{ + failures: map[interface{}]int{}, + baseDelay: baseDelay, + maxDelay: maxDelay, + } +} + +func DefaultItemBasedRateLimiter() RateLimiter { + return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second) +} + +func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + exp := r.failures[item] + r.failures[item] = r.failures[item] + 1 + + // The backoff is capped such that 'calculated' value never overflows. + backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) + if backoff > math.MaxInt64 { + return r.maxDelay + } + + calculated := time.Duration(backoff) + if calculated > r.maxDelay { + return r.maxDelay + } + + return calculated +} + +func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +} + +// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that +type ItemFastSlowRateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + + maxFastAttempts int + fastDelay time.Duration + slowDelay time.Duration +} + +var _ RateLimiter = &ItemFastSlowRateLimiter{} + +func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter { + return &ItemFastSlowRateLimiter{ + failures: map[interface{}]int{}, + fastDelay: fastDelay, + slowDelay: slowDelay, + maxFastAttempts: maxFastAttempts, + } +} + +func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + r.failures[item] = r.failures[item] + 1 + + if r.failures[item] <= r.maxFastAttempts { + return r.fastDelay + } + + return r.slowDelay +} + +func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +func (r *ItemFastSlowRateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +} + +// MaxOfRateLimiter calls every RateLimiter and returns the worst case response +// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items +// were separately delayed a longer time. +type MaxOfRateLimiter struct { + limiters []RateLimiter +} + +func (r *MaxOfRateLimiter) When(item interface{}) time.Duration { + ret := time.Duration(0) + for _, limiter := range r.limiters { + curr := limiter.When(item) + if curr > ret { + ret = curr + } + } + + return ret +} + +func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { + return &MaxOfRateLimiter{limiters: limiters} +} + +func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int { + ret := 0 + for _, limiter := range r.limiters { + curr := limiter.NumRequeues(item) + if curr > ret { + ret = curr + } + } + + return ret +} + +func (r *MaxOfRateLimiter) Forget(item interface{}) { + for _, limiter := range r.limiters { + limiter.Forget(item) + } +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters_test.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters_test.go new file mode 100644 index 00000000000..91d34a31779 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/default_rate_limiters_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" +) + +func TestItemExponentialFailureRateLimiter(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) + + if e, a := 1*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 4*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 8*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 16*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 1*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 1*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second) + for i := 0; i < 5; i++ { + limiter.When("one") + } + if e, a := 32*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + for i := 0; i < 1000; i++ { + limiter.When("overflow1") + } + if e, a := 1000*time.Second, limiter.When("overflow1"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter = NewItemExponentialFailureRateLimiter(1*time.Minute, 1000*time.Hour) + for i := 0; i < 2; i++ { + limiter.When("two") + } + if e, a := 4*time.Minute, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + for i := 0; i < 1000; i++ { + limiter.When("overflow2") + } + if e, a := 1000*time.Hour, limiter.When("overflow2"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestItemFastSlowRateLimiter(t *testing.T) { + limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3) + + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 10*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} + +func TestMaxOfRateLimiter(t *testing.T) { + limiter := NewMaxOfRateLimiter( + NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3), + NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second), + ) + + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 3*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 3*time.Second, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, limiter.NumRequeues("two"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + limiter.Forget("one") + if e, a := 0, limiter.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 5*time.Millisecond, limiter.When("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue.go new file mode 100644 index 00000000000..9ad7960e8f5 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue.go @@ -0,0 +1,246 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sort" + "time" + + "k8s.io/client-go/pkg/util/clock" + utilruntime "k8s.io/client-go/pkg/util/runtime" +) + +// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to +// requeue items after failures without ending up in a hot-loop. +type DelayingInterface interface { + Interface + // AddAfter adds an item to the workqueue after the indicated duration has passed + AddAfter(item interface{}, duration time.Duration) +} + +// NewDelayingQueue constructs a new workqueue with delayed queuing ability +func NewDelayingQueue() DelayingInterface { + return newDelayingQueue(clock.RealClock{}, "") +} + +func NewNamedDelayingQueue(name string) DelayingInterface { + return newDelayingQueue(clock.RealClock{}, name) +} + +func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { + ret := &delayingType{ + Interface: NewNamed(name), + clock: clock, + heartbeat: clock.Tick(maxWait), + stopCh: make(chan struct{}), + waitingTimeByEntry: map[t]time.Time{}, + waitingForAddCh: make(chan waitFor, 1000), + metrics: newRetryMetrics(name), + } + + go ret.waitingLoop() + + return ret +} + +// delayingType wraps an Interface and provides delayed re-enquing +type delayingType struct { + Interface + + // clock tracks time for delayed firing + clock clock.Clock + + // stopCh lets us signal a shutdown to the waiting loop + stopCh chan struct{} + + // heartbeat ensures we wait no more than maxWait before firing + // + // TODO: replace with Ticker (and add to clock) so this can be cleaned up. + // clock.Tick will leak. + heartbeat <-chan time.Time + + // waitingForAdd is an ordered slice of items to be added to the contained work queue + waitingForAdd []waitFor + // waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes + waitingTimeByEntry map[t]time.Time + // waitingForAddCh is a buffered channel that feeds waitingForAdd + waitingForAddCh chan waitFor + + // metrics counts the number of retries + metrics retryMetrics +} + +// waitFor holds the data to add and the time it should be added +type waitFor struct { + data t + readyAt time.Time +} + +// ShutDown gives a way to shut off this queue +func (q *delayingType) ShutDown() { + q.Interface.ShutDown() + close(q.stopCh) +} + +// AddAfter adds the given item to the work queue after the given delay +func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { + // don't add if we're already shutting down + if q.ShuttingDown() { + return + } + + q.metrics.retry() + + // immediately add things with no delay + if duration <= 0 { + q.Add(item) + return + } + + select { + case <-q.stopCh: + // unblock if ShutDown() is called + case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + } +} + +// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. +// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an +// expired item sitting for more than 10 seconds. +const maxWait = 10 * time.Second + +// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. +func (q *delayingType) waitingLoop() { + defer utilruntime.HandleCrash() + + // Make a placeholder channel to use when there are no items in our list + never := make(<-chan time.Time) + + for { + if q.Interface.ShuttingDown() { + // discard waiting entries + q.waitingForAdd = nil + q.waitingTimeByEntry = nil + return + } + + now := q.clock.Now() + + // Add ready entries + readyEntries := 0 + for _, entry := range q.waitingForAdd { + if entry.readyAt.After(now) { + break + } + q.Add(entry.data) + delete(q.waitingTimeByEntry, entry.data) + readyEntries++ + } + q.waitingForAdd = q.waitingForAdd[readyEntries:] + + // Set up a wait for the first item's readyAt (if one exists) + nextReadyAt := never + if len(q.waitingForAdd) > 0 { + nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now)) + } + + select { + case <-q.stopCh: + return + + case <-q.heartbeat: + // continue the loop, which will add ready items + + case <-nextReadyAt: + // continue the loop, which will add ready items + + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) + } else { + q.Add(waitEntry.data) + } + + drained := false + for !drained { + select { + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry) + } else { + q.Add(waitEntry.data) + } + default: + drained = true + } + } + } + } +} + +// inserts the given entry into the sorted entries list +// same semantics as append()... the given slice may be modified, +// and the returned value should be used +// +// TODO: This should probably be converted to use container/heap to improve +// running time for a large number of items. +func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor { + // if the entry is already in our retry list and the existing time is before the new one, just skip it + existingTime, exists := knownEntries[entry.data] + if exists && existingTime.Before(entry.readyAt) { + return entries + } + + // if the entry exists and is scheduled for later, go ahead and remove the entry + if exists { + if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) { + entries = append(entries[:existingIndex], entries[existingIndex+1:]...) + } + } + + insertionIndex := sort.Search(len(entries), func(i int) bool { + return entry.readyAt.Before(entries[i].readyAt) + }) + + // grow by 1 + entries = append(entries, waitFor{}) + // shift items from the insertion point to the end + copy(entries[insertionIndex+1:], entries[insertionIndex:]) + // insert the record + entries[insertionIndex] = entry + + knownEntries[entry.data] = entry.readyAt + + return entries +} + +// findEntryIndex returns the index for an existing entry +func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int { + index := sort.Search(len(entries), func(i int) bool { + return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt + }) + + // we know this is the earliest possible index, but there could be multiple with the same time + // iterate from here to find the dupe + for ; index < len(entries); index++ { + if entries[index].data == data { + break + } + } + + return index +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue_test.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue_test.go new file mode 100644 index 00000000000..b98486daa4a --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/delaying_queue_test.go @@ -0,0 +1,236 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/client-go/pkg/util/clock" + "k8s.io/client-go/pkg/util/wait" +) + +func TestSimpleQueue(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock, "") + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the next heartbeat + fakeClock.Step(10 * time.Second) + + err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) { + if q.Len() > 0 { + return false, fmt.Errorf("added to queue") + } + + return false, nil + }) + if err != wait.ErrWaitTimeout { + t.Errorf("expected timeout, got: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + +func TestDeduping(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock, "") + + first := "foo" + + q.AddAfter(first, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + q.AddAfter(first, 70*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if q.Len() != 0 { + t.Errorf("should not have added") + } + + // step past the first block, we should receive now + fakeClock.Step(60 * time.Millisecond) + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ := q.Get() + q.Done(item) + + // step past the second add + fakeClock.Step(20 * time.Millisecond) + if q.Len() != 0 { + t.Errorf("should not have added") + } + + // test again, but this time the earlier should override + q.AddAfter(first, 50*time.Millisecond) + q.AddAfter(first, 30*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(40 * time.Millisecond) + if err := waitForAdded(q, 1); err != nil { + t.Errorf("should have added") + } + item, _ = q.Get() + q.Done(item) + + // step past the second add + fakeClock.Step(20 * time.Millisecond) + if q.Len() != 0 { + t.Errorf("should not have added") + } + if q.Len() != 0 { + t.Errorf("should not have added") + } +} + +func TestAddTwoFireEarly(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock, "") + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 50*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(60 * time.Millisecond) + + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ := q.Get() + if !reflect.DeepEqual(item, second) { + t.Errorf("expected %v, got %v", second, item) + } + + q.AddAfter(third, 2*time.Second) + + fakeClock.Step(1 * time.Second) + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ = q.Get() + if !reflect.DeepEqual(item, first) { + t.Errorf("expected %v, got %v", first, item) + } + + fakeClock.Step(2 * time.Second) + if err := waitForAdded(q, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + item, _ = q.Get() + if !reflect.DeepEqual(item, third) { + t.Errorf("expected %v, got %v", third, item) + } + +} + +func TestCopyShifting(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + q := newDelayingQueue(fakeClock, "") + + first := "foo" + second := "bar" + third := "baz" + + q.AddAfter(first, 1*time.Second) + q.AddAfter(second, 500*time.Millisecond) + q.AddAfter(third, 250*time.Millisecond) + if err := waitForWaitingQueueToFill(q); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if q.Len() != 0 { + t.Errorf("should not have added") + } + + fakeClock.Step(2 * time.Second) + + if err := waitForAdded(q, 3); err != nil { + t.Fatalf("unexpected err: %v", err) + } + actualFirst, _ := q.Get() + if !reflect.DeepEqual(actualFirst, third) { + t.Errorf("expected %v, got %v", third, actualFirst) + } + actualSecond, _ := q.Get() + if !reflect.DeepEqual(actualSecond, second) { + t.Errorf("expected %v, got %v", second, actualSecond) + } + actualThird, _ := q.Get() + if !reflect.DeepEqual(actualThird, first) { + t.Errorf("expected %v, got %v", first, actualThird) + } +} + +func waitForAdded(q DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + + return false, nil + }) +} + +func waitForWaitingQueueToFill(q DelayingInterface) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if len(q.(*delayingType).waitingForAddCh) == 0 { + return true, nil + } + + return false, nil + }) +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/doc.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/doc.go new file mode 100644 index 00000000000..bb03f56a24d --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package workqueue provides a simple queue that supports the following +// features: +// * Fair: items processed in the order in which they are added. +// * Stingy: a single item will not be processed multiple times concurrently, +// and if an item is added multiple times before it can be processed, it +// will only be processed once. +// * Multiple consumers and producers. In particular, it is allowed for an +// item to be reenqueued while it is being processed. +// * Shutdown notifications. +package workqueue // import "k8s.io/client-go/pkg/util/workqueue" diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/metrics.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/metrics.go new file mode 100644 index 00000000000..a481bdfb266 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/metrics.go @@ -0,0 +1,195 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + "time" +) + +// This file provides abstractions for setting the provider (e.g., prometheus) +// of metrics. + +type queueMetrics interface { + add(item t) + get(item t) + done(item t) +} + +// GaugeMetric represents a single numerical value that can arbitrarily go up +// and down. +type GaugeMetric interface { + Inc() + Dec() +} + +// CounterMetric represents a single numerical value that only ever +// goes up. +type CounterMetric interface { + Inc() +} + +// SummaryMetric captures individual observations. +type SummaryMetric interface { + Observe(float64) +} + +type noopMetric struct{} + +func (noopMetric) Inc() {} +func (noopMetric) Dec() {} +func (noopMetric) Observe(float64) {} + +type defaultQueueMetrics struct { + // current depth of a workqueue + depth GaugeMetric + // total number of adds handled by a workqueue + adds CounterMetric + // how long an item stays in a workqueue + latency SummaryMetric + // how long processing an item from a workqueue takes + workDuration SummaryMetric + addTimes map[t]time.Time + processingStartTimes map[t]time.Time +} + +func (m *defaultQueueMetrics) add(item t) { + if m == nil { + return + } + + m.adds.Inc() + m.depth.Inc() + if _, exists := m.addTimes[item]; !exists { + m.addTimes[item] = time.Now() + } +} + +func (m *defaultQueueMetrics) get(item t) { + if m == nil { + return + } + + m.depth.Dec() + m.processingStartTimes[item] = time.Now() + if startTime, exists := m.addTimes[item]; exists { + m.latency.Observe(sinceInMicroseconds(startTime)) + delete(m.addTimes, item) + } +} + +func (m *defaultQueueMetrics) done(item t) { + if m == nil { + return + } + + if startTime, exists := m.processingStartTimes[item]; exists { + m.workDuration.Observe(sinceInMicroseconds(startTime)) + delete(m.processingStartTimes, item) + } +} + +// Gets the time since the specified start in microseconds. +func sinceInMicroseconds(start time.Time) float64 { + return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) +} + +type retryMetrics interface { + retry() +} + +type defaultRetryMetrics struct { + retries CounterMetric +} + +func (m *defaultRetryMetrics) retry() { + if m == nil { + return + } + + m.retries.Inc() +} + +// MetricsProvider generates various metrics used by the queue. +type MetricsProvider interface { + NewDepthMetric(name string) GaugeMetric + NewAddsMetric(name string) CounterMetric + NewLatencyMetric(name string) SummaryMetric + NewWorkDurationMetric(name string) SummaryMetric + NewRetriesMetric(name string) CounterMetric +} + +type noopMetricsProvider struct{} + +func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { + return noopMetric{} +} + +func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { + return noopMetric{} +} + +var metricsFactory = struct { + metricsProvider MetricsProvider + setProviders sync.Once +}{ + metricsProvider: noopMetricsProvider{}, +} + +func newQueueMetrics(name string) queueMetrics { + var ret *defaultQueueMetrics + if len(name) == 0 { + return ret + } + return &defaultQueueMetrics{ + depth: metricsFactory.metricsProvider.NewDepthMetric(name), + adds: metricsFactory.metricsProvider.NewAddsMetric(name), + latency: metricsFactory.metricsProvider.NewLatencyMetric(name), + workDuration: metricsFactory.metricsProvider.NewWorkDurationMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, + } +} + +func newRetryMetrics(name string) retryMetrics { + var ret *defaultRetryMetrics + if len(name) == 0 { + return ret + } + return &defaultRetryMetrics{ + retries: metricsFactory.metricsProvider.NewRetriesMetric(name), + } +} + +// SetProvider sets the metrics provider of the metricsFactory. +func SetProvider(metricsProvider MetricsProvider) { + metricsFactory.setProviders.Do(func() { + metricsFactory.metricsProvider = metricsProvider + }) +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/parallelizer.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/parallelizer.go new file mode 100644 index 00000000000..b103e865cd0 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/parallelizer.go @@ -0,0 +1,52 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + + utilruntime "k8s.io/client-go/pkg/util/runtime" +) + +type DoWorkPieceFunc func(piece int) + +// Parallelize is a very simple framework that allow for parallelizing +// N independent pieces of work. +func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { + toProcess := make(chan int, pieces) + for i := 0; i < pieces; i++ { + toProcess <- i + } + close(toProcess) + + if pieces < workers { + workers = pieces + } + + wg := sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer utilruntime.HandleCrash() + defer wg.Done() + for piece := range toProcess { + doWorkPiece(piece) + } + }() + } + wg.Wait() +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/queue.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/queue.go new file mode 100644 index 00000000000..3e1a49fe202 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/queue.go @@ -0,0 +1,172 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" +) + +type Interface interface { + Add(item interface{}) + Len() int + Get() (item interface{}, shutdown bool) + Done(item interface{}) + ShutDown() + ShuttingDown() bool +} + +// New constructs a new workqueue (see the package comment). +func New() *Type { + return NewNamed("") +} + +func NewNamed(name string) *Type { + return &Type{ + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + metrics: newQueueMetrics(name), + } +} + +// Type is a work queue (see the package comment). +type Type struct { + // queue defines the order in which we will work on items. Every + // element of queue should be in the dirty set and not in the + // processing set. + queue []t + + // dirty defines all of the items that need to be processed. + dirty set + + // Things that are currently being processed are in the processing set. + // These things may be simultaneously in the dirty set. When we finish + // processing something and remove it from this set, we'll check if + // it's in the dirty set, and if so, add it to the queue. + processing set + + cond *sync.Cond + + shuttingDown bool + + metrics queueMetrics +} + +type empty struct{} +type t interface{} +type set map[t]empty + +func (s set) has(item t) bool { + _, exists := s[item] + return exists +} + +func (s set) insert(item t) { + s[item] = empty{} +} + +func (s set) delete(item t) { + delete(s, item) +} + +// Add marks item as needing processing. +func (q *Type) Add(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + if q.dirty.has(item) { + return + } + + q.metrics.add(item) + + q.dirty.insert(item) + if q.processing.has(item) { + return + } + + q.queue = append(q.queue, item) + q.cond.Signal() +} + +// Len returns the current queue length, for informational purposes only. You +// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular +// value, that can't be synchronized properly. +func (q *Type) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return len(q.queue) +} + +// Get blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *Type) Get() (item interface{}, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for len(q.queue) == 0 && !q.shuttingDown { + q.cond.Wait() + } + if len(q.queue) == 0 { + // We must be shutting down. + return nil, true + } + + item, q.queue = q.queue[0], q.queue[1:] + + q.metrics.get(item) + + q.processing.insert(item) + q.dirty.delete(item) + + return item, false +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. +func (q *Type) Done(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.metrics.done(item) + + q.processing.delete(item) + if q.dirty.has(item) { + q.queue = append(q.queue, item) + q.cond.Signal() + } +} + +// ShutDown will cause q to ignore all new items added to it. As soon as the +// worker goroutines have drained the existing items in the queue, they will be +// instructed to exit. +func (q *Type) ShutDown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.shuttingDown = true + q.cond.Broadcast() +} + +func (q *Type) ShuttingDown() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + return q.shuttingDown +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/queue_test.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/queue_test.go new file mode 100644 index 00000000000..eb38c1238fd --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/queue_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue_test + +import ( + "sync" + "testing" + "time" + + "k8s.io/client-go/pkg/util/workqueue" +) + +func TestBasic(t *testing.T) { + // If something is seriously wrong this test will never complete. + q := workqueue.New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + for j := 0; j < 50; j++ { + q.Add(i) + time.Sleep(time.Millisecond) + } + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + for { + item, quit := q.Get() + if item == "added after shutdown!" { + t.Errorf("Got an item added after shutdown.") + } + if quit { + return + } + t.Logf("Worker %v: begin processing %v", i, item) + time.Sleep(3 * time.Millisecond) + t.Logf("Worker %v: done processing %v", i, item) + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + q.Add("added after shutdown!") + consumerWG.Wait() +} + +func TestAddWhileProcessing(t *testing.T) { + q := workqueue.New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + q.Add(i) + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + // Every worker will re-add every item up to two times. + // This tests the dirty-while-processing case. + counters := map[interface{}]int{} + for { + item, quit := q.Get() + if quit { + return + } + counters[item]++ + if counters[item] < 2 { + q.Add(item) + } + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + consumerWG.Wait() +} + +func TestLen(t *testing.T) { + q := workqueue.New() + q.Add("foo") + if e, a := 1, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("bar") + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("foo") // should not increase the queue length. + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} + +func TestReinsert(t *testing.T) { + q := workqueue.New() + q.Add("foo") + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Add it back while processing + q.Add(i) + + // Finish it up + q.Done(i) + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Finish that one up + q.Done(i) + + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue.go new file mode 100644 index 00000000000..417ac001b84 --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue.go @@ -0,0 +1,69 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +// RateLimitingInterface is an interface that rate limits items being added to the queue. +type RateLimitingInterface interface { + DelayingInterface + + // AddRateLimited adds an item to the workqueue after the rate limiter says its ok + AddRateLimited(item interface{}) + + // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing + // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you + // still have to call `Done` on the queue. + Forget(item interface{}) + + // NumRequeues returns back how many times the item was requeued + NumRequeues(item interface{}) int +} + +// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability +// Remember to call Forget! If you don't, you may end up tracking failures forever. +func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: NewDelayingQueue(), + rateLimiter: rateLimiter, + } +} + +func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: NewNamedDelayingQueue(name), + rateLimiter: rateLimiter, + } +} + +// rateLimitingType wraps an Interface and provides rateLimited re-enquing +type rateLimitingType struct { + DelayingInterface + + rateLimiter RateLimiter +} + +// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok +func (q *rateLimitingType) AddRateLimited(item interface{}) { + q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item)) +} + +func (q *rateLimitingType) NumRequeues(item interface{}) int { + return q.rateLimiter.NumRequeues(item) +} + +func (q *rateLimitingType) Forget(item interface{}) { + q.rateLimiter.Forget(item) +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue_test.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue_test.go new file mode 100644 index 00000000000..f09d2ec9e9a --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/rate_limitting_queue_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" + + "k8s.io/client-go/pkg/util/clock" +) + +func TestRateLimitingQueue(t *testing.T) { + limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) + queue := NewRateLimitingQueue(limiter).(*rateLimitingType) + fakeClock := clock.NewFakeClock(time.Now()) + delayingQueue := &delayingType{ + Interface: New(), + clock: fakeClock, + heartbeat: fakeClock.Tick(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan waitFor, 1000), + metrics: newRetryMetrics(""), + } + queue.DelayingInterface = delayingQueue + + queue.AddRateLimited("one") + waitEntry := <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("one") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + if e, a := 2, queue.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + queue.AddRateLimited("two") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("two") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + + queue.Forget("one") + if e, a := 0, queue.NumRequeues("one"); e != a { + t.Errorf("expected %v, got %v", e, a) + } + queue.AddRateLimited("one") + waitEntry = <-delayingQueue.waitingForAddCh + if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { + t.Errorf("expected %v, got %v", e, a) + } + +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue.go new file mode 100644 index 00000000000..2ad90bfdfcf --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue.go @@ -0,0 +1,52 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import "time" + +type TimedWorkQueue struct { + *Type +} + +type TimedWorkQueueItem struct { + StartTime time.Time + Object interface{} +} + +func NewTimedWorkQueue() *TimedWorkQueue { + return &TimedWorkQueue{New()} +} + +// Add adds the obj along with the current timestamp to the queue. +func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem) { + q.Type.Add(timedItem) +} + +// Get gets the obj along with its timestamp from the queue. +func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool) { + origin, shutdown := q.Type.Get() + if origin == nil { + return nil, shutdown + } + timedItem, _ = origin.(*TimedWorkQueueItem) + return timedItem, shutdown +} + +func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error { + q.Type.Done(timedItem) + return nil +} diff --git a/staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue_test.go b/staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue_test.go new file mode 100644 index 00000000000..3498a4b8cca --- /dev/null +++ b/staging/src/k8s.io/client-go/pkg/util/workqueue/timed_queue_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "testing" + "time" + + "k8s.io/client-go/pkg/api/v1" +) + +func TestNoMemoryLeak(t *testing.T) { + timedQueue := NewTimedWorkQueue() + timedQueue.Add(&TimedWorkQueueItem{Object: &v1.Pod{}, StartTime: time.Time{}}) + item, _ := timedQueue.Get() + timedQueue.Add(item) + // The item should still be in the timedQueue. + timedQueue.Done(item) + item, _ = timedQueue.Get() + timedQueue.Done(item) + if len(timedQueue.Type.processing) != 0 { + t.Errorf("expect timedQueue.Type.processing to be empty!") + } +}