From 453bb17c3cde692b9a0f7f89fcfd9b9687a988a5 Mon Sep 17 00:00:00 2001 From: Joe Beda Date: Wed, 19 Oct 2016 16:09:27 -0700 Subject: [PATCH 1/4] Remove constants that are no longer referenced --- pkg/client/cache/reflector.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 745ee76240f..5410d29a28c 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -74,12 +74,6 @@ var ( // However, it can be modified to avoid periodic resync to break the // TCP connection. minWatchTimeout = 5 * time.Minute - // If we are within 'forceResyncThreshold' from the next planned resync - // and are just before issuing Watch(), resync will be forced now. - forceResyncThreshold = 3 * time.Second - // We try to set timeouts for Watch() so that we will finish about - // than 'timeoutThreshold' from next planned periodic resync. - timeoutThreshold = 1 * time.Second ) // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector From e0c6bf13b1e068540876b55b9742c53761f78079 Mon Sep 17 00:00:00 2001 From: Joe Beda Date: Thu, 20 Oct 2016 14:04:49 -0700 Subject: [PATCH 2/4] TODOs around stopping informers --- pkg/client/cache/controller.go | 9 +++++++-- pkg/client/cache/controller_test.go | 3 +++ pkg/client/cache/reflector.go | 4 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/client/cache/controller.go b/pkg/client/cache/controller.go index 59fd52051eb..5ab84b8dd83 100644 --- a/pkg/client/cache/controller.go +++ b/pkg/client/cache/controller.go @@ -29,7 +29,7 @@ import ( type Config struct { // The queue for your objects; either a FIFO or // a DeltaFIFO. Your Process() function should accept - // the output of this Oueue's Pop() method. + // the output of this Queue's Pop() method. Queue // Something that can list and watch your objects. @@ -121,6 +121,11 @@ func (c *Controller) Requeue(obj interface{}) error { // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. +// +// TODO: Plumb through the stopCh here (and down to the queue) so that this can +// actually exit when the controller is stopped. Or just give up on this stuff +// ever being stoppable. Converting this whole package to use Context would +// also be helpful. func (c *Controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) @@ -134,7 +139,7 @@ func (c *Controller) processLoop() { } // ResourceEventHandler can handle notifications for events that happen to a -// resource. The events are informational only, so you can't return an +// resource. The events are informational only, so you can't return an // error. // * OnAdd is called when an object is added. // * OnUpdate is called when an object is modified. Note that oldObj is the diff --git a/pkg/client/cache/controller_test.go b/pkg/client/cache/controller_test.go index ed82cfd47f1..f26c839b808 100644 --- a/pkg/client/cache/controller_test.go +++ b/pkg/client/cache/controller_test.go @@ -283,6 +283,9 @@ func TestHammerController(t *testing.T) { time.Sleep(100 * time.Millisecond) close(stop) + // TODO: Verify that no goroutines were leaked here and that everything shut + // down cleanly. + outputSetLock.Lock() t.Logf("got: %#v", outputSet) } diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 5410d29a28c..8a0d05ab479 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -45,7 +45,7 @@ import ( // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { - // name identifies this reflector. By default it will be a file:line if possible. + // name identifies this reflector. By default it will be a file:line if possible. name string // The type of object we expect to place in the store. @@ -108,7 +108,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, return r } -// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common +// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common // call chains to NewReflector, so they'd be low entropy names for reflectors var internalPackages = []string{"kubernetes/pkg/client/cache/", "/runtime/asm_"} From 16b3485817398a154374a46622ca60b3707a20cf Mon Sep 17 00:00:00 2001 From: Joe Beda Date: Fri, 21 Oct 2016 18:29:36 -0700 Subject: [PATCH 3/4] Adds some TODOs and small fixes to pkg/util/workqueue Adds a new unit test for queue. --- pkg/util/workqueue/delaying_queue.go | 10 ++++++-- pkg/util/workqueue/parallelizer.go | 4 +++ pkg/util/workqueue/queue.go | 2 +- pkg/util/workqueue/queue_test.go | 30 ++++++++++++++++++++++ pkg/util/workqueue/rate_limitting_queue.go | 5 ++-- 5 files changed, 46 insertions(+), 5 deletions(-) diff --git a/pkg/util/workqueue/delaying_queue.go b/pkg/util/workqueue/delaying_queue.go index 5a71b8181a1..eaa6b5bb431 100644 --- a/pkg/util/workqueue/delaying_queue.go +++ b/pkg/util/workqueue/delaying_queue.go @@ -24,7 +24,7 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) -// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to +// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to // requeue items after failures without ending up in a hot-loop. type DelayingInterface interface { Interface @@ -68,6 +68,9 @@ type delayingType struct { stopCh chan struct{} // heartbeat ensures we wait no more than maxWait before firing + // + // TODO: replace with Ticker (and add to clock) so this can be cleaned up. + // clock.Tick will leak. heartbeat <-chan time.Time // waitingForAdd is an ordered slice of items to be added to the contained work queue @@ -115,7 +118,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { } } -// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. +// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an // expired item sitting for more than 10 seconds. const maxWait = 10 * time.Second @@ -192,6 +195,9 @@ func (q *delayingType) waitingLoop() { // inserts the given entry into the sorted entries list // same semantics as append()... the given slice may be modified, // and the returned value should be used +// +// TODO: This should probably be converted to use container/heap to improve +// running time for a large number of items. func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor { // if the entry is already in our retry list and the existing time is before the new one, just skip it existingTime, exists := knownEntries[entry.data] diff --git a/pkg/util/workqueue/parallelizer.go b/pkg/util/workqueue/parallelizer.go index a9305935bb2..b66cebe6985 100644 --- a/pkg/util/workqueue/parallelizer.go +++ b/pkg/util/workqueue/parallelizer.go @@ -33,6 +33,10 @@ func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { } close(toProcess) + if pieces < workers { + workers = pieces + } + wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index 9a2ecad382d..3e1a49fe202 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -154,7 +154,7 @@ func (q *Type) Done(item interface{}) { } } -// Shutdown will cause q to ignore all new items added to it. As soon as the +// ShutDown will cause q to ignore all new items added to it. As soon as the // worker goroutines have drained the existing items in the queue, they will be // instructed to exit. func (q *Type) ShutDown() { diff --git a/pkg/util/workqueue/queue_test.go b/pkg/util/workqueue/queue_test.go index 625f4545fd1..4b8553672e2 100644 --- a/pkg/util/workqueue/queue_test.go +++ b/pkg/util/workqueue/queue_test.go @@ -129,3 +129,33 @@ func TestLen(t *testing.T) { t.Errorf("Expected %v, got %v", e, a) } } + +func TestReinsert(t *testing.T) { + q := workqueue.New() + q.Add("foo") + + // Start processing + i, _ := q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Add it back while processing + q.Add(i) + + // Finish it up + q.Done(i) + + // It should be back on the queue + i, _ = q.Get() + if i != "foo" { + t.Errorf("Expected %v, got %v", "foo", i) + } + + // Finish that one up + q.Done(i) + + if a := q.Len(); a != 0 { + t.Errorf("Expected queue to be empty. Has %v items", a) + } +} diff --git a/pkg/util/workqueue/rate_limitting_queue.go b/pkg/util/workqueue/rate_limitting_queue.go index 9a2bfbb56a7..417ac001b84 100644 --- a/pkg/util/workqueue/rate_limitting_queue.go +++ b/pkg/util/workqueue/rate_limitting_queue.go @@ -16,10 +16,10 @@ limitations under the License. package workqueue -// RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to -// requeue items after failures without ending up in a hot-loop. +// RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { DelayingInterface + // AddRateLimited adds an item to the workqueue after the rate limiter says its ok AddRateLimited(item interface{}) @@ -27,6 +27,7 @@ type RateLimitingInterface interface { // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) + // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int } From 48d1505ba6f0b33b8ebf60fb5ed3b66f16338380 Mon Sep 17 00:00:00 2001 From: Joe Beda Date: Fri, 21 Oct 2016 20:04:14 -0700 Subject: [PATCH 4/4] Expand documention for wait package --- pkg/util/wait/wait.go | 143 ++++++++++++++++++++++++++++-------------- 1 file changed, 95 insertions(+), 48 deletions(-) diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 34d833b44d9..34ed2301da8 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -36,33 +36,42 @@ var ForeverTestTimeout = time.Second * 30 // NeverStop may be passed to Until to make it never stop. var NeverStop <-chan struct{} = make(chan struct{}) -// Forever is syntactic sugar on top of Until +// Forever calls f every period for ever. +// +// Forever is syntactic sugar on top of Until. func Forever(f func(), period time.Duration) { Until(f, period, NeverStop) } // Until loops until stop channel is closed, running f every period. -// Until is syntactic sugar on top of JitterUntil with zero jitter -// factor, with sliding = true (which means the timer for period -// starts after the f completes). +// +// Until is syntactic sugar on top of JitterUntil with zero jitter factor and +// with sliding = true (which means the timer for period starts after the f +// completes). func Until(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, true, stopCh) } // NonSlidingUntil loops until stop channel is closed, running f every -// period. NonSlidingUntil is syntactic sugar on top of JitterUntil -// with zero jitter factor, with sliding = false (meaning the timer for -// period starts at the same time as the function starts). +// period. +// +// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter +// factor, with sliding = false (meaning the timer for period starts at the same +// time as the function starts). func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, false, stopCh) } // JitterUntil loops until stop channel is closed, running f every period. +// // If jitterFactor is positive, the period is jittered before every run of f. -// If jitterFactor is not positive, the period is unchanged. -// Catches any panics, and keeps going. f may not be invoked if -// stop channel is already closed. Pass NeverStop to Until if you -// don't want it stop. +// If jitterFactor is not positive, the period is unchanged and not jitterd. +// +// If slidingis true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +// +// Close stopCh to stop. f may not be invoked if stop channel is already +// closed. Pass NeverStop to if you don't want it stop. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { for { @@ -104,9 +113,11 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b } } -// Jitter returns a time.Duration between duration and duration + maxFactor * duration, -// to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a -// suggested default value will be chosen. +// Jitter returns a time.Duration between duration and duration + maxFactor * +// duration. +// +// This allows clients to avoid converging on periodic behavior. If maxFactor +// is 0.0, a suggested default value will be chosen. func Jitter(duration time.Duration, maxFactor float64) time.Duration { if maxFactor <= 0.0 { maxFactor = 1.0 @@ -115,26 +126,31 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { return wait } -// ErrWaitTimeout is returned when the condition exited without success +// ErrWaitTimeout is returned when the condition exited without success. var ErrWaitTimeout = errors.New("timed out waiting for the condition") // ConditionFunc returns true if the condition is satisfied, or an error // if the loop should be aborted. type ConditionFunc func() (done bool, err error) -// Backoff is parameters applied to a Backoff function. +// Backoff holds parameters applied to a Backoff function. type Backoff struct { - Duration time.Duration - Factor float64 - Jitter float64 - Steps int + Duration time.Duration // the base duration + Factor float64 // Duration is multipled by factor each iteration + Jitter float64 // The amount of jitter applied each iteration + Steps int // Exit with error after this many steps } -// ExponentialBackoff repeats a condition check up to steps times, increasing the wait -// by multipling the previous duration by factor. If jitter is greater than zero, -// a random amount of each duration is added (between duration and duration*(1+jitter)). -// If the condition never returns true, ErrWaitTimeout is returned. All other errors -// terminate immediately. +// ExponentialBackoff repeats a condition check with exponential backoff. +// +// It checks the condition up to Steps times, increasing the wait by multipling +// the previous duration by Factor. +// +// If Jitter is greater than zero, a random amount of each duration is added +// (between duration and duration*(1+jitter)). +// +// If the condition never returns true, ErrWaitTimeout is returned. All other +// errors terminate immediately. func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { duration := backoff.Duration for i := 0; i < backoff.Steps; i++ { @@ -154,22 +170,33 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { } // Poll tries a condition func until it returns true, an error, or the timeout -// is reached. condition will always be invoked at least once but some intervals -// may be missed if the condition takes too long or the time window is too short. +// is reached. +// +// Poll always waits the interval before the run of 'condition'. +// 'condition' will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// // If you want to Poll something forever, see PollInfinite. -// Poll always waits the interval before the first check of the condition. func Poll(interval, timeout time.Duration, condition ConditionFunc) error { return pollInternal(poller(interval, timeout), condition) } func pollInternal(wait WaitFunc, condition ConditionFunc) error { - done := make(chan struct{}) - defer close(done) - return WaitFor(wait, condition, done) + return WaitFor(wait, condition, NeverStop) } -// PollImmediate is identical to Poll, except that it performs the first check -// immediately, not waiting interval beforehand. +// PollImmediate tries a condition func until it returns true, an error, or the timeout +// is reached. +// +// Poll always checks 'condition' before waiting for the interval. 'condition' +// will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to Poll something forever, see PollInfinite. func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error { return pollImmediateInternal(poller(interval, timeout), condition) } @@ -185,16 +212,24 @@ func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error { return pollInternal(wait, condition) } -// PollInfinite polls forever. +// PollInfinite tries a condition func until it returns true or an error +// +// PollInfinite always waits the interval before the run of 'condition'. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. func PollInfinite(interval time.Duration, condition ConditionFunc) error { done := make(chan struct{}) defer close(done) return PollUntil(interval, condition, done) } -// PollImmediateInfinite is identical to PollInfinite, except that it -// performs the first check immediately, not waiting interval -// beforehand. +// PollImmediateInfinite tries a condition func until it returns true or an error +// +// PollImmediateInfinite runs the 'condition' before waiting for the interval. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error { done, err := condition() if err != nil { @@ -206,7 +241,11 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro return PollInfinite(interval, condition) } -// PollUntil is like Poll, but it takes a stop change instead of total duration +// PollUntil tries a condition func until it returns true, an error or stopCh is +// closed. +// +// PolUntil always waits interval before the first run of 'condition'. +// 'condition' will always be invoked at least once. func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { return WaitFor(poller(interval, 0), condition, stopCh) } @@ -215,11 +254,16 @@ func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan st // should be executed and is closed when the last test should be invoked. type WaitFunc func(done <-chan struct{}) <-chan struct{} -// WaitFor gets a channel from wait(), and then invokes fn once for every value -// placed on the channel and once more when the channel is closed. If fn -// returns an error the loop ends and that error is returned, and if fn returns -// true the loop ends and nil is returned. ErrWaitTimeout will be returned if -// the channel is closed without fn ever returning true. +// WaitFor continually checks 'fn' as driven by 'wait'. +// +// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value +// placed on the channel and once more when the channel is closed. +// +// If 'fn' returns an error the loop ends and that error is returned, and if +// 'fn' returns true the loop ends and nil is returned. +// +// ErrWaitTimeout will be returned if the channel is closed without fn ever +// returning true. func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { c := wait(done) for { @@ -238,11 +282,14 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { return ErrWaitTimeout } -// poller returns a WaitFunc that will send to the channel every -// interval until timeout has elapsed and then close the channel. -// Over very short intervals you may receive no ticks before -// the channel is closed. If timeout is 0, the channel -// will never be closed. +// poller returns a WaitFunc that will send to the channel every interval until +// timeout has elapsed and then closes the channel. +// +// Over very short intervals you may receive no ticks before the channel is +// closed. A timeout of 0 is interpreted as an infinity. +// +// Output ticks are not buffered. If the channel is not ready to receive an +// item, the tick is skipped. func poller(interval, timeout time.Duration) WaitFunc { return WaitFunc(func(done <-chan struct{}) <-chan struct{} { ch := make(chan struct{})