From 3ec25c54252568b0e3d97feffc9363c9f1e6ff1f Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Wed, 25 May 2016 13:58:35 -0700 Subject: [PATCH] routecontroller: Add wait.NonSlidingUntil, use it Make sure the reconciliation loop kicks in again immediately if it takes a loooooong time. --- pkg/controller/route/routecontroller.go | 2 +- .../server/stats/volume_stat_caculator.go | 2 +- pkg/util/wait/wait.go | 45 +++++++++++++++---- pkg/util/wait/wait_test.go | 28 ++++++++++-- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index 206a5c79fbc..1aeaed2d742 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -50,7 +50,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam } func (rc *RouteController) Run(syncPeriod time.Duration) { - go wait.Until(func() { + go wait.NonSlidingUntil(func() { if err := rc.reconcileNodeRoutes(); err != nil { glog.Errorf("Couldn't reconcile node routes: %v", err) } diff --git a/pkg/kubelet/server/stats/volume_stat_caculator.go b/pkg/kubelet/server/stats/volume_stat_caculator.go index cb15a0453b9..65bc6254ce1 100644 --- a/pkg/kubelet/server/stats/volume_stat_caculator.go +++ b/pkg/kubelet/server/stats/volume_stat_caculator.go @@ -61,7 +61,7 @@ func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator { s.startO.Do(func() { go wait.JitterUntil(func() { s.calcAndStoreStats() - }, s.jitterPeriod, 1.0, s.stopChannel) + }, s.jitterPeriod, 1.0, true, s.stopChannel) }) return s } diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index ef910538b99..b56560e75c2 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -42,9 +42,19 @@ func Forever(f func(), period time.Duration) { } // Until loops until stop channel is closed, running f every period. -// Until is syntactic sugar on top of JitterUntil with zero jitter factor +// Until is syntactic sugar on top of JitterUntil with zero jitter +// factor, with sliding = true (which means the timer for period +// starts after the f completes). func Until(f func(), period time.Duration, stopCh <-chan struct{}) { - JitterUntil(f, period, 0.0, stopCh) + JitterUntil(f, period, 0.0, true, stopCh) +} + +// NonSlidingUntil loops until stop channel is closed, running f every +// period. NonSlidingUntil is syntactic sugar on top of JitterUntil +// with zero jitter factor, with sliding = false (meaning the timer for +// period starts at the same time as the function starts). +func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, false, stopCh) } // JitterUntil loops until stop channel is closed, running f every period. @@ -53,7 +63,7 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) { // Catches any panics, and keeps going. f may not be invoked if // stop channel is already closed. Pass NeverStop to Until if you // don't want it stop. -func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-chan struct{}) { +func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { select { case <-stopCh: return @@ -61,20 +71,37 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <- } for { - func() { - defer runtime.HandleCrash() - f() - }() - jitteredPeriod := period if jitterFactor > 0.0 { jitteredPeriod = Jitter(period, jitterFactor) } + var t *time.Timer + if !sliding { + t = time.NewTimer(jitteredPeriod) + } + + func() { + defer runtime.HandleCrash() + f() + }() + + if sliding { + t = time.NewTimer(jitteredPeriod) + } else { + // The timer we created could already have fired, so be + // careful and check stopCh first. + select { + case <-stopCh: + return + default: + } + } + select { case <-stopCh: return - case <-time.After(jitteredPeriod): + case <-t.C: } } } diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index 28de7e01ba6..da532cf3253 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -45,6 +45,26 @@ func TestUntil(t *testing.T) { <-called } +func TestNonSlidingUntil(t *testing.T) { + ch := make(chan struct{}) + close(ch) + NonSlidingUntil(func() { + t.Fatal("should not have been invoked") + }, 0, ch) + + ch = make(chan struct{}) + called := make(chan struct{}) + go func() { + NonSlidingUntil(func() { + called <- struct{}{} + }, 0, ch) + close(called) + }() + <-called + close(ch) + <-called +} + func TestUntilReturnsImmediately(t *testing.T) { now := time.Now() ch := make(chan struct{}) @@ -63,14 +83,14 @@ func TestJitterUntil(t *testing.T) { close(ch) JitterUntil(func() { t.Fatal("should not have been invoked") - }, 0, 1.0, ch) + }, 0, 1.0, true, ch) ch = make(chan struct{}) called := make(chan struct{}) go func() { JitterUntil(func() { called <- struct{}{} - }, 0, 1.0, ch) + }, 0, 1.0, true, ch) close(called) }() <-called @@ -83,7 +103,7 @@ func TestJitterUntilReturnsImmediately(t *testing.T) { ch := make(chan struct{}) JitterUntil(func() { close(ch) - }, 30*time.Second, 1.0, ch) + }, 30*time.Second, 1.0, true, ch) if now.Add(25 * time.Second).Before(time.Now()) { t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func") } @@ -98,7 +118,7 @@ func TestJitterUntilNegativeFactor(t *testing.T) { JitterUntil(func() { called <- struct{}{} <-received - }, time.Second, -30.0, ch) + }, time.Second, -30.0, true, ch) }() // first loop <-called