Merge pull request #22426 from mwielgus/sleep-duration

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-03-25 06:51:00 -07:00
commit c579618097
4 changed files with 52 additions and 29 deletions

View File

@ -35,7 +35,7 @@ import (
const maxTriesPerEvent = 12 const maxTriesPerEvent = 12
var sleepDuration = 10 * time.Second var defaultSleepDuration = 10 * time.Second
const maxQueuedEvents = 1000 const maxQueuedEvents = 1000
@ -93,11 +93,16 @@ type EventBroadcaster interface {
// Creates a new event broadcaster. // Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster { func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)} return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
} }
type eventBroadcasterImpl struct { type eventBroadcasterImpl struct {
*watch.Broadcaster *watch.Broadcaster
sleepDuration time.Duration
} }
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
@ -110,11 +115,11 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
eventCorrelator := NewEventCorrelator(util.RealClock{}) eventCorrelator := NewEventCorrelator(util.RealClock{})
return eventBroadcaster.StartEventWatcher( return eventBroadcaster.StartEventWatcher(
func(event *api.Event) { func(event *api.Event) {
recordToSink(sink, event, eventCorrelator, randGen) recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
}) })
} }
func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) { func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners. // Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this. // Events are safe to copy like this.
eventCopy := *event eventCopy := *event

View File

@ -33,11 +33,6 @@ import (
"k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/util/strategicpatch"
) )
func init() {
// Don't bother sleeping between retries.
sleepDuration = 0
}
type testEventSink struct { type testEventSink struct {
OnCreate func(e *api.Event) (*api.Event, error) OnCreate func(e *api.Event) (*api.Event, error)
OnUpdate func(e *api.Event) (*api.Event, error) OnUpdate func(e *api.Event) (*api.Event, error)
@ -346,7 +341,7 @@ func TestEventf(t *testing.T) {
}, },
OnPatch: OnPatchFactory(testCache, patchEvent), OnPatch: OnPatchFactory(testCache, patchEvent),
} }
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcasterForTests(0)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := util.NewFakeClock(time.Now()) clock := util.NewFakeClock(time.Now())
@ -431,7 +426,7 @@ func TestWriteEventError(t *testing.T) {
}, },
} }
ev := &api.Event{} ev := &api.Event{}
recordToSink(sink, ev, eventCorrelator, randGen) recordToSink(sink, ev, eventCorrelator, randGen, 0)
if attempts != ent.attemptsWanted { if attempts != ent.attemptsWanted {
t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts)
} }
@ -460,7 +455,7 @@ func TestLotsOfEvents(t *testing.T) {
}, },
} }
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcasterForTests(0)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
loggerCalled <- struct{}{} loggerCalled <- struct{}{}
@ -557,7 +552,7 @@ func TestEventfNoNamespace(t *testing.T) {
}, },
OnPatch: OnPatchFactory(testCache, patchEvent), OnPatch: OnPatchFactory(testCache, patchEvent),
} }
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcasterForTests(0)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
clock := util.NewFakeClock(time.Now()) clock := util.NewFakeClock(time.Now())
@ -846,7 +841,7 @@ func TestMultiSinkCache(t *testing.T) {
OnPatch: OnPatchFactory(testCache2, patchEvent2), OnPatch: OnPatchFactory(testCache2, patchEvent2),
} }
eventBroadcaster := NewBroadcaster() eventBroadcaster := NewBroadcasterForTests(0)
clock := util.NewFakeClock(time.Now()) clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)

View File

@ -65,19 +65,8 @@ type HorizontalController struct {
var downscaleForbiddenWindow = 5 * time.Minute var downscaleForbiddenWindow = 5 * time.Minute
var upscaleForbiddenWindow = 3 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute
func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedextensions.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController { func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *framework.Controller) {
broadcaster := record.NewBroadcaster() return framework.NewInformer(
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
metricsClient: metricsClient,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
}
controller.store, controller.controller = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).List(options) return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).List(options)
@ -111,6 +100,22 @@ func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNa
// We are not interested in deletions. // We are not interested in deletions.
}, },
) )
}
func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedextensions.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
metricsClient: metricsClient,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
}
store, frameworkController := newInformer(controller, resyncPeriod)
controller.store = store
controller.controller = frameworkController
return controller return controller
} }

View File

@ -29,8 +29,10 @@ import (
_ "k8s.io/kubernetes/pkg/apimachinery/registered" _ "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -329,13 +331,29 @@ func (tc *testCase) verifyResults(t *testing.T) {
func (tc *testCase) runTest(t *testing.T) { func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t) testClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
hpaController := NewHorizontalController(testClient.Core(), testClient.Extensions(), testClient.Extensions(), metricsClient, 0)
broadcaster := record.NewBroadcasterForTests(0)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{testClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
hpaController := &HorizontalController{
metricsClient: metricsClient,
eventRecorder: recorder,
scaleNamespacer: testClient.Extensions(),
hpaNamespacer: testClient.Extensions(),
}
store, frameworkController := newInformer(hpaController, time.Minute)
hpaController.store = store
hpaController.controller = frameworkController
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
go hpaController.Run(stop) go hpaController.Run(stop)
if tc.verifyEvents { if tc.verifyEvents {
// We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). // We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration).
time.Sleep(12 * time.Second) time.Sleep(2 * time.Second)
} }
// Wait for HPA to be processed. // Wait for HPA to be processed.
<-tc.processed <-tc.processed