Merge pull request #114236 from dgrisonnet/fix-eventseries-race

Fix data race when emitting similar Events consecutively

Kubernetes-commit: ca858e0c961db6ef8b22ecc3e257a02757261ea1
This commit is contained in:
Kubernetes Publisher 2023-01-02 08:39:30 -08:00
commit 3e0d990533
2 changed files with 51 additions and 9 deletions

View File

@ -184,19 +184,21 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
Count: 1, Count: 1,
LastObservedTime: metav1.MicroTime{Time: clock.Now()}, LastObservedTime: metav1.MicroTime{Time: clock.Now()},
} }
return isomorphicEvent // Make a copy of the Event to make sure that recording it
// doesn't mess with the object stored in cache.
return isomorphicEvent.DeepCopy()
} }
e.eventCache[eventKey] = eventCopy e.eventCache[eventKey] = eventCopy
return eventCopy // Make a copy of the Event to make sure that recording it doesn't
// mess with the object stored in cache.
return eventCopy.DeepCopy()
}() }()
if evToRecord != nil { if evToRecord != nil {
recordedEvent := e.attemptRecording(evToRecord) // TODO: Add a metric counting the number of recording attempts
if recordedEvent != nil { e.attemptRecording(evToRecord)
recordedEventKey := getKey(recordedEvent) // We don't want the new recorded Event to be reflected in the
e.mu.Lock() // client's cache because server-side mutations could mess with the
defer e.mu.Unlock() // aggregation mechanism used by the client.
e.eventCache[recordedEventKey] = recordedEvent
}
} }
}() }()
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package events package events
import ( import (
"context"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -29,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime" k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
fake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference" ref "k8s.io/client-go/tools/reference"
@ -185,6 +187,44 @@ func TestEventSeriesf(t *testing.T) {
close(stopCh) close(stopCh)
} }
// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to
// an EventSink consecutively there is no data race. This test is meant to be
// run with the `-race` option.
func TestEventSeriesWithEventSinkImplRace(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
eventBroadcaster := NewBroadcaster(eventSink)
stopCh := make(chan struct{})
eventBroadcaster.StartRecordingToSink(stopCh)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test")
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
if len(events.Items) != 1 {
return false, nil
}
if events.Items[0].Series == nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie")
}
}
func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) { func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
recvEvent := *actualEvent recvEvent := *actualEvent