From 43a47a82348202db94320afefafa033333a437e0 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Tue, 2 Feb 2016 10:28:44 +0100 Subject: [PATCH] Move Until, Forever, NeverStop, ForeverTestTimeout from util to wait --- pkg/util/logs.go | 3 ++- pkg/util/util.go | 43 -------------------------------------- pkg/util/util_test.go | 32 ---------------------------- pkg/util/wait/wait.go | 43 ++++++++++++++++++++++++++++++++++++++ pkg/util/wait/wait_test.go | 41 ++++++++++++++++++++++++++++++------ 5 files changed, 80 insertions(+), 82 deletions(-) diff --git a/pkg/util/logs.go b/pkg/util/logs.go index 37b9b767b2c..c79c4903d1b 100644 --- a/pkg/util/logs.go +++ b/pkg/util/logs.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" "github.com/spf13/pflag" + "k8s.io/kubernetes/pkg/util/wait" ) var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes") @@ -46,7 +47,7 @@ func InitLogs() { log.SetOutput(GlogWriter{}) log.SetFlags(0) // The default glog flush interval is 30 seconds, which is frighteningly long. - go Until(glog.Flush, *logFlushFreq, NeverStop) + go wait.Until(glog.Flush, *logFlushFreq, wait.NeverStop) } // FlushLogs flushes logs immediately. diff --git a/pkg/util/util.go b/pkg/util/util.go index 48cc67c711b..63da8ef5e76 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -24,53 +24,10 @@ import ( "regexp" "strconv" "strings" - "time" "k8s.io/kubernetes/pkg/util/intstr" - "k8s.io/kubernetes/pkg/util/runtime" ) -// For any test of the style: -// ... -// <- time.After(timeout): -// t.Errorf("Timed out") -// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s -// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine -// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. -var ForeverTestTimeout = time.Second * 30 - -// NeverStop may be passed to Until to make it never stop. -var NeverStop <-chan struct{} = make(chan struct{}) - -// Forever is syntactic sugar on top of Until -func Forever(f func(), period time.Duration) { - Until(f, period, NeverStop) -} - -// Until loops until stop channel is closed, running f every period. -// Catches any panics, and keeps going. f may not be invoked if -// stop channel is already closed. Pass NeverStop to Until if you -// don't want it stop. -func Until(f func(), period time.Duration, stopCh <-chan struct{}) { - select { - case <-stopCh: - return - default: - } - - for { - func() { - defer runtime.HandleCrash() - f() - }() - select { - case <-stopCh: - return - case <-time.After(period): - } - } -} - func GetIntOrPercentValue(intOrStr *intstr.IntOrString) (int, bool, error) { switch intOrStr.Type { case intstr.Int: diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index b5ef5b682e8..7fd236e1e72 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -18,40 +18,8 @@ package util import ( "testing" - "time" ) -func TestUntil(t *testing.T) { - ch := make(chan struct{}) - close(ch) - Until(func() { - t.Fatal("should not have been invoked") - }, 0, ch) - - ch = make(chan struct{}) - called := make(chan struct{}) - go func() { - Until(func() { - called <- struct{}{} - }, 0, ch) - close(called) - }() - <-called - close(ch) - <-called -} - -func TestUntilReturnsImmediately(t *testing.T) { - now := time.Now() - ch := make(chan struct{}) - Until(func() { - close(ch) - }, 30*time.Second, ch) - if now.Add(25 * time.Second).Before(time.Now()) { - t.Errorf("Until did not return immediately when the stop chan was closed inside the func") - } -} - func TestStringDiff(t *testing.T) { diff := StringDiff("aaabb", "aaacc") expect := "aaa\n\nA: bb\n\nB: cc\n\n" diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 275505f73ff..0005b00ecf9 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -20,8 +20,51 @@ import ( "errors" "math/rand" "time" + + "k8s.io/kubernetes/pkg/util/runtime" ) +// For any test of the style: +// ... +// <- time.After(timeout): +// t.Errorf("Timed out") +// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s +// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine +// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. +var ForeverTestTimeout = time.Second * 30 + +// NeverStop may be passed to Until to make it never stop. +var NeverStop <-chan struct{} = make(chan struct{}) + +// Forever is syntactic sugar on top of Until +func Forever(f func(), period time.Duration) { + Until(f, period, NeverStop) +} + +// Until loops until stop channel is closed, running f every period. +// Catches any panics, and keeps going. f may not be invoked if +// stop channel is already closed. Pass NeverStop to Until if you +// don't want it stop. +func Until(f func(), period time.Duration, stopCh <-chan struct{}) { + select { + case <-stopCh: + return + default: + } + + for { + func() { + defer runtime.HandleCrash() + f() + }() + select { + case <-stopCh: + return + case <-time.After(period): + } + } +} + // Jitter returns a time.Duration between duration and duration + maxFactor * duration, // to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a // suggested default value will be chosen. diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index 09dcb0c4900..cd647892091 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -23,10 +23,39 @@ import ( "sync/atomic" "testing" "time" - - "k8s.io/kubernetes/pkg/util" ) +func TestUntil(t *testing.T) { + ch := make(chan struct{}) + close(ch) + Until(func() { + t.Fatal("should not have been invoked") + }, 0, ch) + + ch = make(chan struct{}) + called := make(chan struct{}) + go func() { + Until(func() { + called <- struct{}{} + }, 0, ch) + close(called) + }() + <-called + close(ch) + <-called +} + +func TestUntilReturnsImmediately(t *testing.T) { + now := time.Now() + ch := make(chan struct{}) + Until(func() { + close(ch) + }, 30*time.Second, ch) + if now.Add(25 * time.Second).Before(time.Now()) { + t.Errorf("Until did not return immediately when the stop chan was closed inside the func") + } +} + func TestExponentialBackoff(t *testing.T) { opts := Backoff{Factor: 1.0, Steps: 3} @@ -87,7 +116,7 @@ DRAIN: break DRAIN } count++ - case <-time.After(util.ForeverTestTimeout): + case <-time.After(ForeverTestTimeout): t.Errorf("unexpected timeout after poll") } } @@ -233,7 +262,7 @@ func TestPollForever(t *testing.T) { if !open { t.Fatalf("did not expect channel to be closed") } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(ForeverTestTimeout): t.Fatalf("channel did not return at least once within the poll interval") } } @@ -313,14 +342,14 @@ func TestWaitFor(t *testing.T) { func TestWaitForWithDelay(t *testing.T) { done := make(chan struct{}) defer close(done) - WaitFor(poller(time.Millisecond, util.ForeverTestTimeout), func() (bool, error) { + WaitFor(poller(time.Millisecond, ForeverTestTimeout), func() (bool, error) { time.Sleep(10 * time.Millisecond) return true, nil }, done) // If polling goroutine doesn't see the done signal it will leak timers. select { case done <- struct{}{}: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(ForeverTestTimeout): t.Errorf("expected an ack of the done signal.") } }