diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index 3c6870a2..bde888ee 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -307,14 +307,7 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime return watcher.Stop } -// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. -func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { - go wait.Until(func() { - e.refreshExistingEventSeries() - }, refreshTime, stopCh) - go wait.Until(func() { - e.finishSeries() - }, finishTime, stopCh) +func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) { eventHandler := func(obj runtime.Object) { event, ok := obj.(*eventsv1.Event) if !ok { @@ -330,6 +323,13 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { }() } +// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. +func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { + go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh) + go wait.Until(e.finishSeries, finishTime, stopCh) + e.startRecordingEvents(stopCh) +} + type eventBroadcasterAdapterImpl struct { coreClient typedv1core.EventsGetter coreBroadcaster record.EventBroadcaster diff --git a/tools/events/eventseries_test.go b/tools/events/eventseries_test.go index 39c2ae71..3fe01a29 100644 --- a/tools/events/eventseries_test.go +++ b/tools/events/eventseries_test.go @@ -69,7 +69,6 @@ func TestEventSeriesf(t *testing.T) { testPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - SelfLink: "/api/v1/namespaces/baz/pods/foo", Name: "foo", Namespace: "baz", UID: "bar", @@ -158,7 +157,11 @@ func TestEventSeriesf(t *testing.T) { } eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*eventsv1.Event{}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest") - eventBroadcaster.StartRecordingToSink(stopCh) + broadcaster := eventBroadcaster.(*eventBroadcasterImpl) + // Don't call StartRecordingToSink, as we don't need neither refreshing event + // series nor finishing them in this tests and additional events updated would + // race with our expected ones. + broadcaster.startRecordingEvents(stopCh) recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) // read from the chan as this was needed only to populate the cache <-createEvent