mirror of
https://github.com/kubernetes/client-go.git
synced 2025-12-25 06:02:30 +00:00
tools/events: fix data race when emitting series
There was a data race in the recordToSink function that caused changes
to the events cache to be overriden if events were emitted
simultaneously via Eventf calls.
The race lies in the fact that when recording an Event, there might be
multiple calls updating the cache simultaneously. The lock period is
optimized so that after updating the cache with the new Event, the lock
is unlocked until the Event is recorded on the apiserver side and then
the cache is locked again to be updated with the new value returned by
the apiserver.
The are a few problem with the approach:
1. If two identical Events are emitted successively the changes of the
second Event will override the first one. In code the following
happen:
1. Eventf(ev1)
2. Eventf(ev2)
3. Lock cache
4. Set cache[getKey(ev1)] = &ev1
5. Unlock cache
6. Lock cache
7. Update cache[getKey(ev2)] = &ev1 + Series{Count: 1}
8. Unlock cache
9. Start attempting to record the first event &ev1 on the apiserver side.
This can be mitigated by recording a copy of the Event stored in
cache instead of reusing the pointer from the cache.
2. When the Event has been recorded on the apiserver the cache is
updated again with the value of the Event returned by the server.
This update will override any changes made to the cache entry when
attempting to record the new Event since the cache was unlocked at
that time. This might lead to some inconsistencies when dealing with
EventSeries since the count may be overriden or the client might even
try to record the first isomorphic Event multiple time.
This could be mitigated with a lock that has a larger scope, but we
shouldn't want to reflect Event returned by the apiserver in the
cache in the first place since mutation could mess with the
aggregation by either allowing users to manipulate values to update
a different cache entry or even having two cache entries for the same
Events.
Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com>
Kubernetes-commit: cfdd40b569d7630b9b31ddbe0557159b1f8b0f9e
This commit is contained in:
committed by
Kubernetes Publisher
parent
b1a83534de
commit
08d548e149
@@ -184,19 +184,21 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
|
||||
Count: 1,
|
||||
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
|
||||
}
|
||||
return isomorphicEvent
|
||||
// Make a copy of the Event to make sure that recording it
|
||||
// doesn't mess with the object stored in cache.
|
||||
return isomorphicEvent.DeepCopy()
|
||||
}
|
||||
e.eventCache[eventKey] = eventCopy
|
||||
return eventCopy
|
||||
// Make a copy of the Event to make sure that recording it doesn't
|
||||
// mess with the object stored in cache.
|
||||
return eventCopy.DeepCopy()
|
||||
}()
|
||||
if evToRecord != nil {
|
||||
recordedEvent := e.attemptRecording(evToRecord)
|
||||
if recordedEvent != nil {
|
||||
recordedEventKey := getKey(recordedEvent)
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.eventCache[recordedEventKey] = recordedEvent
|
||||
}
|
||||
// TODO: Add a metric counting the number of recording attempts
|
||||
e.attemptRecording(evToRecord)
|
||||
// We don't want the new recorded Event to be reflected in the
|
||||
// client's cache because server-side mutations could mess with the
|
||||
// aggregation mechanism used by the client.
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -29,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
fake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
@@ -185,6 +187,44 @@ func TestEventSeriesf(t *testing.T) {
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to
|
||||
// an EventSink consecutively there is no data race. This test is meant to be
|
||||
// run with the `-race` option.
|
||||
func TestEventSeriesWithEventSinkImplRace(t *testing.T) {
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
|
||||
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
|
||||
eventBroadcaster := NewBroadcaster(eventSink)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test")
|
||||
|
||||
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
|
||||
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
|
||||
|
||||
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
|
||||
events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(events.Items) != 1 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if events.Items[0].Series == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie")
|
||||
}
|
||||
}
|
||||
|
||||
func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
|
||||
recvEvent := *actualEvent
|
||||
|
||||
|
||||
Reference in New Issue
Block a user