From e90e67bd002e70a525d3ee9045b213a5d826074d Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Fri, 16 Oct 2020 15:05:00 -0700 Subject: [PATCH] Don't record events in goroutines This changes the event recorder to use the equivalent of a select statement instead of a goroutine to record events. Previously, we used a goroutine to make event recording non-blocking. Unfortunately, this writes to a channel, and during shutdown we then race to write to a closed channel, panicing (caught by the error handler, but still) and making the race detector unhappy. Instead, we now use the select statement to make event emitting non-blocking, and if we'd block, we just drop the event. We already drop events if a particular sink is overloaded, so this just moves the incoming event queue to match that behavior (and makes the incoming event queue much longer). This means that, if the user uses `Eventf` and friends correctly (i.e. ensure they've returned by the time we call `Shutdown`), it's now safe to call Shutdown. This matches the conventional go guidance on channels: the writer should call close. --- .../src/k8s.io/apimachinery/pkg/watch/mux.go | 28 +++++++++++++++++++ .../k8s.io/client-go/tools/record/event.go | 19 +++++++------ .../client-go/tools/record/event_test.go | 24 ++++++++++++++++ 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go index 0aaf01adce6..e01d519060b 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go @@ -74,6 +74,22 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B return m } +// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster, +// except that the incoming queue is the same size as the outgoing queues +// (specified by queueLength). +func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { + m := &Broadcaster{ + watchers: map[int64]*broadcasterWatcher{}, + incoming: make(chan Event, queueLength), + stopped: make(chan struct{}), + watchQueueLength: queueLength, + fullChannelBehavior: fullChannelBehavior, + } + m.distributing.Add(1) + go m.loop() + return m +} + const internalRunFunctionMarker = "internal-do-function" // a function type we can shoehorn into the queue. @@ -198,6 +214,18 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} } +// Action distributes the given event among all watchers, or drops it on the floor +// if too many incoming actions are queued up. Returns true if the action was sent, +// false if dropped. +func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool { + select { + case m.incoming <- Event{action, obj}: + return true + default: + return false + } +} + // Shutdown disconnects all watchers (but any queued events will still be distributed). // You must not call Action or Watch* after calling Shutdown. This call blocks // until all events have been distributed through the outbound channels. Note diff --git a/staging/src/k8s.io/client-go/tools/record/event.go b/staging/src/k8s.io/client-go/tools/record/event.go index 48ef45bb5bc..451cc9e1461 100644 --- a/staging/src/k8s.io/client-go/tools/record/event.go +++ b/staging/src/k8s.io/client-go/tools/record/event.go @@ -155,21 +155,21 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re // Creates a new event broadcaster. func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } } func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: sleepDuration, } } func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, options: options, } @@ -338,11 +338,14 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source - go func() { - // NOTE: events should be a non-blocking operation - defer utilruntime.HandleCrash() - recorder.Action(watch.Added, event) - }() + // NOTE: events should be a non-blocking operation, but we also need to not + // put this in a goroutine, otherwise we'll race to write to a closed channel + // when we go to shut down this broadcaster. Just drop events if we get overloaded, + // and log an error if that happens (we've configured the broadcaster to drop + // outgoing events anyway). + if sent := recorder.ActionOrDrop(watch.Added, event); !sent { + klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) + } } func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { diff --git a/staging/src/k8s.io/client-go/tools/record/event_test.go b/staging/src/k8s.io/client-go/tools/record/event_test.go index 67b033d5f1c..60182bb3bcb 100644 --- a/staging/src/k8s.io/client-go/tools/record/event_test.go +++ b/staging/src/k8s.io/client-go/tools/record/event_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "sync" "testing" "time" @@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) } } +func TestNonRacyShutdown(t *testing.T) { + // Attempt to simulate previously racy conditions, and ensure that no race + // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same + // thread should be a safe operation, but it's not if we launch recorder.Action + // in a goroutine. + + caster := NewBroadcasterForTests(0) + clock := clock.NewFakeClock(time.Now()) + recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock) + + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah") + }() + } + + wg.Wait() + caster.Shutdown() +} + func TestEventf(t *testing.T) { testPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{