Merge pull request #26301 from zmerlynn/wait_proper

Automatic merge from submit-queue

routecontroller: Add wait.NonSlidingUntil, use it

[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]() Make sure the reconciliation loop kicks in again immediately if it
takes a loooooong time.
This commit is contained in:
k8s-merge-robot 2016-05-26 03:29:21 -07:00
commit 98766f4548
4 changed files with 62 additions and 15 deletions

View File

@ -50,7 +50,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam
} }
func (rc *RouteController) Run(syncPeriod time.Duration) { func (rc *RouteController) Run(syncPeriod time.Duration) {
go wait.Until(func() { go wait.NonSlidingUntil(func() {
if err := rc.reconcileNodeRoutes(); err != nil { if err := rc.reconcileNodeRoutes(); err != nil {
glog.Errorf("Couldn't reconcile node routes: %v", err) glog.Errorf("Couldn't reconcile node routes: %v", err)
} }

View File

@ -61,7 +61,7 @@ func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
s.startO.Do(func() { s.startO.Do(func() {
go wait.JitterUntil(func() { go wait.JitterUntil(func() {
s.calcAndStoreStats() s.calcAndStoreStats()
}, s.jitterPeriod, 1.0, s.stopChannel) }, s.jitterPeriod, 1.0, true, s.stopChannel)
}) })
return s return s
} }

View File

@ -42,9 +42,19 @@ func Forever(f func(), period time.Duration) {
} }
// Until loops until stop channel is closed, running f every period. // Until loops until stop channel is closed, running f every period.
// Until is syntactic sugar on top of JitterUntil with zero jitter factor // Until is syntactic sugar on top of JitterUntil with zero jitter
// factor, with sliding = true (which means the timer for period
// starts after the f completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) { func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, stopCh) JitterUntil(f, period, 0.0, true, stopCh)
}
// NonSlidingUntil loops until stop channel is closed, running f every
// period. NonSlidingUntil is syntactic sugar on top of JitterUntil
// with zero jitter factor, with sliding = false (meaning the timer for
// period starts at the same time as the function starts).
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, false, stopCh)
} }
// JitterUntil loops until stop channel is closed, running f every period. // JitterUntil loops until stop channel is closed, running f every period.
@ -53,7 +63,7 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
// Catches any panics, and keeps going. f may not be invoked if // Catches any panics, and keeps going. f may not be invoked if
// stop channel is already closed. Pass NeverStop to Until if you // stop channel is already closed. Pass NeverStop to Until if you
// don't want it stop. // don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-chan struct{}) { func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
select { select {
case <-stopCh: case <-stopCh:
return return
@ -61,20 +71,37 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-
} }
for { for {
func() {
defer runtime.HandleCrash()
f()
}()
jitteredPeriod := period jitteredPeriod := period
if jitterFactor > 0.0 { if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor) jitteredPeriod = Jitter(period, jitterFactor)
} }
var t *time.Timer
if !sliding {
t = time.NewTimer(jitteredPeriod)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = time.NewTimer(jitteredPeriod)
} else {
// The timer we created could already have fired, so be
// careful and check stopCh first.
select {
case <-stopCh:
return
default:
}
}
select { select {
case <-stopCh: case <-stopCh:
return return
case <-time.After(jitteredPeriod): case <-t.C:
} }
} }
} }

View File

@ -45,6 +45,26 @@ func TestUntil(t *testing.T) {
<-called <-called
} }
func TestNonSlidingUntil(t *testing.T) {
ch := make(chan struct{})
close(ch)
NonSlidingUntil(func() {
t.Fatal("should not have been invoked")
}, 0, ch)
ch = make(chan struct{})
called := make(chan struct{})
go func() {
NonSlidingUntil(func() {
called <- struct{}{}
}, 0, ch)
close(called)
}()
<-called
close(ch)
<-called
}
func TestUntilReturnsImmediately(t *testing.T) { func TestUntilReturnsImmediately(t *testing.T) {
now := time.Now() now := time.Now()
ch := make(chan struct{}) ch := make(chan struct{})
@ -63,14 +83,14 @@ func TestJitterUntil(t *testing.T) {
close(ch) close(ch)
JitterUntil(func() { JitterUntil(func() {
t.Fatal("should not have been invoked") t.Fatal("should not have been invoked")
}, 0, 1.0, ch) }, 0, 1.0, true, ch)
ch = make(chan struct{}) ch = make(chan struct{})
called := make(chan struct{}) called := make(chan struct{})
go func() { go func() {
JitterUntil(func() { JitterUntil(func() {
called <- struct{}{} called <- struct{}{}
}, 0, 1.0, ch) }, 0, 1.0, true, ch)
close(called) close(called)
}() }()
<-called <-called
@ -83,7 +103,7 @@ func TestJitterUntilReturnsImmediately(t *testing.T) {
ch := make(chan struct{}) ch := make(chan struct{})
JitterUntil(func() { JitterUntil(func() {
close(ch) close(ch)
}, 30*time.Second, 1.0, ch) }, 30*time.Second, 1.0, true, ch)
if now.Add(25 * time.Second).Before(time.Now()) { if now.Add(25 * time.Second).Before(time.Now()) {
t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func") t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func")
} }
@ -98,7 +118,7 @@ func TestJitterUntilNegativeFactor(t *testing.T) {
JitterUntil(func() { JitterUntil(func() {
called <- struct{}{} called <- struct{}{}
<-received <-received
}, time.Second, -30.0, ch) }, time.Second, -30.0, true, ch)
}() }()
// first loop // first loop
<-called <-called