diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 9e19d5d8f1b..c9bdb06a9ba 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -197,7 +197,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas). Run(3, wait.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), + nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisor.Fake) diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 79d39a37a54..74ea2e008cd 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -409,7 +409,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, - evictionTimeout, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, + evictionTimeout, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } for _, ds := range item.daemonSets { @@ -659,8 +659,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), + util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -809,8 +809,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), + util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("Case[%d] unexpected error: %v", i, err) @@ -891,7 +891,7 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index e0bccd54e92..a04711466ff 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -39,7 +39,7 @@ func CheckSetEq(lhs, rhs sets.String) bool { } func TestAddNode(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -62,7 +62,7 @@ func TestAddNode(t *testing.T) { } func TestDelNode(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -84,7 +84,7 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -106,7 +106,7 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -130,7 +130,7 @@ func TestDelNode(t *testing.T) { } func TestTry(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -152,7 +152,7 @@ func TestTry(t *testing.T) { } func TestTryOrdering(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -184,7 +184,7 @@ func TestTryOrdering(t *testing.T) { } func TestTryRemovingWhileTry(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) + evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") diff --git a/pkg/util/throttle.go b/pkg/util/throttle.go index 54a2fe58dae..c1caea099fe 100644 --- a/pkg/util/throttle.go +++ b/pkg/util/throttle.go @@ -16,7 +16,11 @@ limitations under the License. package util -import "github.com/juju/ratelimit" +import ( + "sync" + + "github.com/juju/ratelimit" +) type RateLimiter interface { // TryAccept returns true if a token is taken immediately. Otherwise, @@ -47,12 +51,6 @@ func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { return &tokenBucketRateLimiter{limiter} } -type fakeRateLimiter struct{} - -func NewFakeRateLimiter() RateLimiter { - return &fakeRateLimiter{} -} - func (t *tokenBucketRateLimiter) TryAccept() bool { return t.limiter.TakeAvailable(1) == 1 } @@ -71,14 +69,48 @@ func (t *tokenBucketRateLimiter) Accept() { func (t *tokenBucketRateLimiter) Stop() { } -func (t *fakeRateLimiter) TryAccept() bool { +type fakeAlwaysRateLimiter struct{} + +func NewFakeAlwaysRateLimiter() RateLimiter { + return &fakeAlwaysRateLimiter{} +} + +func (t *fakeAlwaysRateLimiter) TryAccept() bool { return true } -func (t *fakeRateLimiter) Saturation() float64 { +func (t *fakeAlwaysRateLimiter) Saturation() float64 { return 0 } -func (t *fakeRateLimiter) Stop() {} +func (t *fakeAlwaysRateLimiter) Stop() {} -func (t *fakeRateLimiter) Accept() {} +func (t *fakeAlwaysRateLimiter) Accept() {} + +type fakeNeverRateLimiter struct { + wg sync.WaitGroup +} + +func NewFakeNeverRateLimiter() RateLimiter { + wg := sync.WaitGroup{} + wg.Add(1) + return &fakeNeverRateLimiter{ + wg: wg, + } +} + +func (t *fakeNeverRateLimiter) TryAccept() bool { + return false +} + +func (t *fakeNeverRateLimiter) Saturation() float64 { + return 1 +} + +func (t *fakeNeverRateLimiter) Stop() { + t.wg.Done() +} + +func (t *fakeNeverRateLimiter) Accept() { + t.wg.Wait() +} diff --git a/pkg/util/throttle_test.go b/pkg/util/throttle_test.go index 089d7087191..ca0e9ac2343 100644 --- a/pkg/util/throttle_test.go +++ b/pkg/util/throttle_test.go @@ -18,6 +18,7 @@ package util import ( "math" + "sync" "testing" "time" ) @@ -87,3 +88,40 @@ func TestRateLimiterSaturation(t *testing.T) { } } } + +func TestAlwaysFake(t *testing.T) { + rl := NewFakeAlwaysRateLimiter() + if !rl.TryAccept() { + t.Error("TryAccept in AlwaysFake should return true.") + } + // If this will block the test will timeout + rl.Accept() +} + +func TestNeverFake(t *testing.T) { + rl := NewFakeNeverRateLimiter() + if rl.TryAccept() { + t.Error("TryAccept in NeverFake should return false.") + } + + finished := false + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + rl.Accept() + finished = true + wg.Done() + }() + + // Wait some time to make sure it never finished. + time.Sleep(time.Second) + if finished { + t.Error("Accept should block forever in NeverFake.") + } + + rl.Stop() + wg.Wait() + if !finished { + t.Error("Stop should make Accept unblock in NeverFake.") + } +} diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go index 2f10065ece9..6f7620bbe11 100644 --- a/test/e2e/service_latency.go +++ b/test/e2e/service_latency.go @@ -66,7 +66,7 @@ var _ = Describe("Service endpoints latency", func() { // Turn off rate limiting--it interferes with our measurements. oldThrottle := f.Client.RESTClient.Throttle - f.Client.RESTClient.Throttle = util.NewFakeRateLimiter() + f.Client.RESTClient.Throttle = util.NewFakeAlwaysRateLimiter() defer func() { f.Client.RESTClient.Throttle = oldThrottle }() failing := sets.NewString()