diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 0005b00ecf9..31c81b6c27c 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -42,10 +42,18 @@ func Forever(f func(), period time.Duration) { } // Until loops until stop channel is closed, running f every period. +// Until is syntactic sugar on top of JitterUntil with zero jitter factor +func Until(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, stopCh) +} + +// JitterUntil loops until stop channel is closed, running f every period. +// If jitterFactor is positive, the period is jittered before every run of f. +// If jitterFactor is not positive, the period is unchanged. // Catches any panics, and keeps going. f may not be invoked if // stop channel is already closed. Pass NeverStop to Until if you // don't want it stop. -func Until(f func(), period time.Duration, stopCh <-chan struct{}) { +func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-chan struct{}) { select { case <-stopCh: return @@ -57,10 +65,16 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) { defer runtime.HandleCrash() f() }() + + jitteredPeriod := period + if jitterFactor > 0.0 { + jitteredPeriod = Jitter(period, jitterFactor) + } + select { case <-stopCh: return - case <-time.After(period): + case <-time.After(jitteredPeriod): } } } diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index cd647892091..28de7e01ba6 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -56,6 +56,65 @@ func TestUntilReturnsImmediately(t *testing.T) { } } +func TestJitterUntil(t *testing.T) { + ch := make(chan struct{}) + // if a channel is closed JitterUntil never calls function f + // and returns imidiatelly + close(ch) + JitterUntil(func() { + t.Fatal("should not have been invoked") + }, 0, 1.0, ch) + + ch = make(chan struct{}) + called := make(chan struct{}) + go func() { + JitterUntil(func() { + called <- struct{}{} + }, 0, 1.0, ch) + close(called) + }() + <-called + close(ch) + <-called +} + +func TestJitterUntilReturnsImmediately(t *testing.T) { + now := time.Now() + ch := make(chan struct{}) + JitterUntil(func() { + close(ch) + }, 30*time.Second, 1.0, ch) + if now.Add(25 * time.Second).Before(time.Now()) { + t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func") + } +} + +func TestJitterUntilNegativeFactor(t *testing.T) { + now := time.Now() + ch := make(chan struct{}) + called := make(chan struct{}) + received := make(chan struct{}) + go func() { + JitterUntil(func() { + called <- struct{}{} + <-received + }, time.Second, -30.0, ch) + }() + // first loop + <-called + received <- struct{}{} + // second loop + <-called + close(ch) + received <- struct{}{} + + // it should take at most 2 seconds + some overhead, not 3 + if now.Add(3 * time.Second).Before(time.Now()) { + t.Errorf("JitterUntil did not returned after predefined period with negative jitter factor when the stop chan was closed inside the func") + } + +} + func TestExponentialBackoff(t *testing.T) { opts := Backoff{Factor: 1.0, Steps: 3}