diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index fbb89b8e..e62d2cce 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -184,19 +184,21 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C Count: 1, LastObservedTime: metav1.MicroTime{Time: clock.Now()}, } - return isomorphicEvent + // Make a copy of the Event to make sure that recording it + // doesn't mess with the object stored in cache. + return isomorphicEvent.DeepCopy() } e.eventCache[eventKey] = eventCopy - return eventCopy + // Make a copy of the Event to make sure that recording it doesn't + // mess with the object stored in cache. + return eventCopy.DeepCopy() }() if evToRecord != nil { - recordedEvent := e.attemptRecording(evToRecord) - if recordedEvent != nil { - recordedEventKey := getKey(recordedEvent) - e.mu.Lock() - defer e.mu.Unlock() - e.eventCache[recordedEventKey] = recordedEvent - } + // TODO: Add a metric counting the number of recording attempts + e.attemptRecording(evToRecord) + // We don't want the new recorded Event to be reflected in the + // client's cache because server-side mutations could mess with the + // aggregation mechanism used by the client. } }() } diff --git a/tools/events/eventseries_test.go b/tools/events/eventseries_test.go index 08cc9b26..b51811c9 100644 --- a/tools/events/eventseries_test.go +++ b/tools/events/eventseries_test.go @@ -17,6 +17,7 @@ limitations under the License. package events import ( + "context" "strconv" "testing" "time" @@ -29,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + fake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" @@ -185,6 +187,44 @@ func TestEventSeriesf(t *testing.T) { close(stopCh) } +// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to +// an EventSink consecutively there is no data race. This test is meant to be +// run with the `-race` option. +func TestEventSeriesWithEventSinkImplRace(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + + eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()} + eventBroadcaster := NewBroadcaster(eventSink) + + stopCh := make(chan struct{}) + eventBroadcaster.StartRecordingToSink(stopCh) + + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test") + + recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "") + recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "") + + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + + if len(events.Items) != 1 { + return false, nil + } + + if events.Items[0].Series == nil { + return false, nil + } + + return true, nil + }) + if err != nil { + t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie") + } +} + func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) { recvEvent := *actualEvent