From 3fa290980a34490015fd8d4392a4d37130e81b67 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Fri, 22 Jan 2016 14:00:08 +0100 Subject: [PATCH 1/2] jitter period in each run of Until In order to synchronize the current state of Kubernetes's objects (e.g. pods, containers, etc.), periodic synch loops are run. When there is a lot of objects to synchronize with, loops increase communication traffic. At some point when all the traffic interfere cpu usage curve hits the roof causing 100% cpu utilization. To distribute the traffic in time, some sync loops can jitter their period in each loop and help to flatten the curve. This commit adds JitterUntil function with jitterFactor parameter. JitterUntil generalizes Until which is a special case for jitterFactor being zero. --- pkg/util/wait/wait.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 0005b00ecf9..31c81b6c27c 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -42,10 +42,18 @@ func Forever(f func(), period time.Duration) { } // Until loops until stop channel is closed, running f every period. +// Until is syntactic sugar on top of JitterUntil with zero jitter factor +func Until(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, stopCh) +} + +// JitterUntil loops until stop channel is closed, running f every period. +// If jitterFactor is positive, the period is jittered before every run of f. +// If jitterFactor is not positive, the period is unchanged. // Catches any panics, and keeps going. f may not be invoked if // stop channel is already closed. Pass NeverStop to Until if you // don't want it stop. -func Until(f func(), period time.Duration, stopCh <-chan struct{}) { +func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-chan struct{}) { select { case <-stopCh: return @@ -57,10 +65,16 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) { defer runtime.HandleCrash() f() }() + + jitteredPeriod := period + if jitterFactor > 0.0 { + jitteredPeriod = Jitter(period, jitterFactor) + } + select { case <-stopCh: return - case <-time.After(period): + case <-time.After(jitteredPeriod): } } } From 8e270922be6f9546d222e64315a6e739e9e9a8ab Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Fri, 22 Jan 2016 15:08:56 +0100 Subject: [PATCH 2/2] Add unit test for JitterUntil --- pkg/util/wait/wait_test.go | 59 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index cd647892091..28de7e01ba6 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -56,6 +56,65 @@ func TestUntilReturnsImmediately(t *testing.T) { } } +func TestJitterUntil(t *testing.T) { + ch := make(chan struct{}) + // if a channel is closed JitterUntil never calls function f + // and returns imidiatelly + close(ch) + JitterUntil(func() { + t.Fatal("should not have been invoked") + }, 0, 1.0, ch) + + ch = make(chan struct{}) + called := make(chan struct{}) + go func() { + JitterUntil(func() { + called <- struct{}{} + }, 0, 1.0, ch) + close(called) + }() + <-called + close(ch) + <-called +} + +func TestJitterUntilReturnsImmediately(t *testing.T) { + now := time.Now() + ch := make(chan struct{}) + JitterUntil(func() { + close(ch) + }, 30*time.Second, 1.0, ch) + if now.Add(25 * time.Second).Before(time.Now()) { + t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func") + } +} + +func TestJitterUntilNegativeFactor(t *testing.T) { + now := time.Now() + ch := make(chan struct{}) + called := make(chan struct{}) + received := make(chan struct{}) + go func() { + JitterUntil(func() { + called <- struct{}{} + <-received + }, time.Second, -30.0, ch) + }() + // first loop + <-called + received <- struct{}{} + // second loop + <-called + close(ch) + received <- struct{}{} + + // it should take at most 2 seconds + some overhead, not 3 + if now.Add(3 * time.Second).Before(time.Now()) { + t.Errorf("JitterUntil did not returned after predefined period with negative jitter factor when the stop chan was closed inside the func") + } + +} + func TestExponentialBackoff(t *testing.T) { opts := Backoff{Factor: 1.0, Steps: 3}