From 4b578e12427e14566083912c18946e132ed7d72a Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Thu, 1 Dec 2022 15:39:34 +0100 Subject: [PATCH] tools/events: fix data race when emitting series There was a data race in the recordToSink function that caused changes to the events cache to be overriden if events were emitted simultaneously via Eventf calls. The race lies in the fact that when recording an Event, there might be multiple calls updating the cache simultaneously. The lock period is optimized so that after updating the cache with the new Event, the lock is unlocked until the Event is recorded on the apiserver side and then the cache is locked again to be updated with the new value returned by the apiserver. The are a few problem with the approach: 1. If two identical Events are emitted successively the changes of the second Event will override the first one. In code the following happen: 1. Eventf(ev1) 2. Eventf(ev2) 3. Lock cache 4. Set cache[getKey(ev1)] = &ev1 5. Unlock cache 6. Lock cache 7. Update cache[getKey(ev2)] = &ev1 + Series{Count: 1} 8. Unlock cache 9. Start attempting to record the first event &ev1 on the apiserver side. This can be mitigated by recording a copy of the Event stored in cache instead of reusing the pointer from the cache. 2. When the Event has been recorded on the apiserver the cache is updated again with the value of the Event returned by the server. This update will override any changes made to the cache entry when attempting to record the new Event since the cache was unlocked at that time. This might lead to some inconsistencies when dealing with EventSeries since the count may be overriden or the client might even try to record the first isomorphic Event multiple time. This could be mitigated with a lock that has a larger scope, but we shouldn't want to reflect Event returned by the apiserver in the cache in the first place since mutation could mess with the aggregation by either allowing users to manipulate values to update a different cache entry or even having two cache entries for the same Events. Signed-off-by: Damien Grisonnet Kubernetes-commit: 55ec09d377274b4a6107fe0b7a061ad408fe05a7 --- tools/events/event_broadcaster.go | 20 +++++++++------- tools/events/eventseries_test.go | 40 +++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index fbb89b8e..e62d2cce 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -184,19 +184,21 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C Count: 1, LastObservedTime: metav1.MicroTime{Time: clock.Now()}, } - return isomorphicEvent + // Make a copy of the Event to make sure that recording it + // doesn't mess with the object stored in cache. + return isomorphicEvent.DeepCopy() } e.eventCache[eventKey] = eventCopy - return eventCopy + // Make a copy of the Event to make sure that recording it doesn't + // mess with the object stored in cache. + return eventCopy.DeepCopy() }() if evToRecord != nil { - recordedEvent := e.attemptRecording(evToRecord) - if recordedEvent != nil { - recordedEventKey := getKey(recordedEvent) - e.mu.Lock() - defer e.mu.Unlock() - e.eventCache[recordedEventKey] = recordedEvent - } + // TODO: Add a metric counting the number of recording attempts + e.attemptRecording(evToRecord) + // We don't want the new recorded Event to be reflected in the + // client's cache because server-side mutations could mess with the + // aggregation mechanism used by the client. } }() } diff --git a/tools/events/eventseries_test.go b/tools/events/eventseries_test.go index 08cc9b26..b51811c9 100644 --- a/tools/events/eventseries_test.go +++ b/tools/events/eventseries_test.go @@ -17,6 +17,7 @@ limitations under the License. package events import ( + "context" "strconv" "testing" "time" @@ -29,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + fake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" @@ -185,6 +187,44 @@ func TestEventSeriesf(t *testing.T) { close(stopCh) } +// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to +// an EventSink consecutively there is no data race. This test is meant to be +// run with the `-race` option. +func TestEventSeriesWithEventSinkImplRace(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + + eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()} + eventBroadcaster := NewBroadcaster(eventSink) + + stopCh := make(chan struct{}) + eventBroadcaster.StartRecordingToSink(stopCh) + + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test") + + recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "") + recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "") + + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + + if len(events.Items) != 1 { + return false, nil + } + + if events.Items[0].Series == nil { + return false, nil + } + + return true, nil + }) + if err != nil { + t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie") + } +} + func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) { recvEvent := *actualEvent