diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index a4786824755..5b43dc650ad 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -35,7 +35,7 @@ import ( const maxTriesPerEvent = 12 -var sleepDuration = 10 * time.Second +var defaultSleepDuration = 10 * time.Second const maxQueuedEvents = 1000 @@ -93,11 +93,16 @@ type EventBroadcaster interface { // Creates a new event broadcaster. func NewBroadcaster() EventBroadcaster { - return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)} + return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration} +} + +func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { + return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration} } type eventBroadcasterImpl struct { *watch.Broadcaster + sleepDuration time.Duration } // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. @@ -110,11 +115,11 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin eventCorrelator := NewEventCorrelator(util.RealClock{}) return eventBroadcaster.StartEventWatcher( func(event *api.Event) { - recordToSink(sink, event, eventCorrelator, randGen) + recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration) }) } -func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) { +func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 8c482f55935..333a7b4bc5b 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -33,11 +33,6 @@ import ( "k8s.io/kubernetes/pkg/util/strategicpatch" ) -func init() { - // Don't bother sleeping between retries. - sleepDuration = 0 -} - type testEventSink struct { OnCreate func(e *api.Event) (*api.Event, error) OnUpdate func(e *api.Event) (*api.Event, error) @@ -346,7 +341,7 @@ func TestEventf(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcaster() + eventBroadcaster := NewBroadcasterForTests(0) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := util.NewFakeClock(time.Now()) @@ -431,7 +426,7 @@ func TestWriteEventError(t *testing.T) { }, } ev := &api.Event{} - recordToSink(sink, ev, eventCorrelator, randGen) + recordToSink(sink, ev, eventCorrelator, randGen, 0) if attempts != ent.attemptsWanted { t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) } @@ -460,7 +455,7 @@ func TestLotsOfEvents(t *testing.T) { }, } - eventBroadcaster := NewBroadcaster() + eventBroadcaster := NewBroadcasterForTests(0) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { loggerCalled <- struct{}{} @@ -557,7 +552,7 @@ func TestEventfNoNamespace(t *testing.T) { }, OnPatch: OnPatchFactory(testCache, patchEvent), } - eventBroadcaster := NewBroadcaster() + eventBroadcaster := NewBroadcasterForTests(0) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) clock := util.NewFakeClock(time.Now()) @@ -846,7 +841,7 @@ func TestMultiSinkCache(t *testing.T) { OnPatch: OnPatchFactory(testCache2, patchEvent2), } - eventBroadcaster := NewBroadcaster() + eventBroadcaster := NewBroadcasterForTests(0) clock := util.NewFakeClock(time.Now()) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 3ba83dbdf90..d066ce48e48 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -65,19 +65,8 @@ type HorizontalController struct { var downscaleForbiddenWindow = 5 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute -func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedextensions.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{evtNamespacer.Events("")}) - recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) - - controller := &HorizontalController{ - metricsClient: metricsClient, - eventRecorder: recorder, - scaleNamespacer: scaleNamespacer, - hpaNamespacer: hpaNamespacer, - } - - controller.store, controller.controller = framework.NewInformer( +func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *framework.Controller) { + return framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).List(options) @@ -111,6 +100,22 @@ func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNa // We are not interested in deletions. }, ) +} + +func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedextensions.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController { + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{evtNamespacer.Events("")}) + recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) + + controller := &HorizontalController{ + metricsClient: metricsClient, + eventRecorder: recorder, + scaleNamespacer: scaleNamespacer, + hpaNamespacer: hpaNamespacer, + } + store, frameworkController := newInformer(controller, resyncPeriod) + controller.store = store + controller.controller = frameworkController return controller } diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 7b0be4f6a78..e86f469b8a7 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -29,8 +29,10 @@ import ( _ "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" + unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/runtime" @@ -329,13 +331,29 @@ func (tc *testCase) verifyResults(t *testing.T) { func (tc *testCase) runTest(t *testing.T) { testClient := tc.prepareTestClient(t) metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) - hpaController := NewHorizontalController(testClient.Core(), testClient.Extensions(), testClient.Extensions(), metricsClient, 0) + + broadcaster := record.NewBroadcasterForTests(0) + broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{testClient.Core().Events("")}) + recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) + + hpaController := &HorizontalController{ + metricsClient: metricsClient, + eventRecorder: recorder, + scaleNamespacer: testClient.Extensions(), + hpaNamespacer: testClient.Extensions(), + } + + store, frameworkController := newInformer(hpaController, time.Minute) + hpaController.store = store + hpaController.controller = frameworkController + stop := make(chan struct{}) defer close(stop) go hpaController.Run(stop) + if tc.verifyEvents { // We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). - time.Sleep(12 * time.Second) + time.Sleep(2 * time.Second) } // Wait for HPA to be processed. <-tc.processed