diff --git a/tools/record/event.go b/tools/record/event.go index 0bf20c1d..30a66601 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -155,21 +155,21 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re // Creates a new event broadcaster. func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } } func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: sleepDuration, } } func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, options: options, } @@ -338,11 +338,14 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source - go func() { - // NOTE: events should be a non-blocking operation - defer utilruntime.HandleCrash() - recorder.Action(watch.Added, event) - }() + // NOTE: events should be a non-blocking operation, but we also need to not + // put this in a goroutine, otherwise we'll race to write to a closed channel + // when we go to shut down this broadcaster. Just drop events if we get overloaded, + // and log an error if that happens (we've configured the broadcaster to drop + // outgoing events anyway). + if sent := recorder.ActionOrDrop(watch.Added, event); !sent { + klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) + } } func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { diff --git a/tools/record/event_test.go b/tools/record/event_test.go index 67b033d5..60182bb3 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "sync" "testing" "time" @@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) } } +func TestNonRacyShutdown(t *testing.T) { + // Attempt to simulate previously racy conditions, and ensure that no race + // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same + // thread should be a safe operation, but it's not if we launch recorder.Action + // in a goroutine. + + caster := NewBroadcasterForTests(0) + clock := clock.NewFakeClock(time.Now()) + recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock) + + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah") + }() + } + + wg.Wait() + caster.Shutdown() +} + func TestEventf(t *testing.T) { testPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{