Don't record events in goroutines

This changes the event recorder to use the equivalent of a select
statement instead of a goroutine to record events.

Previously, we used a goroutine to make event recording non-blocking.
Unfortunately, this writes to a channel, and during shutdown we then
race to write to a closed channel, panicing (caught by the error
handler, but still) and making the race detector unhappy.

Instead, we now use the select statement to make event emitting
non-blocking, and if we'd block, we just drop the event.  We already
drop events if a particular sink is overloaded, so this just moves the
incoming event queue to match that behavior (and makes the incoming
event queue much longer).

This means that, if the user uses `Eventf` and friends correctly (i.e.
ensure they've returned by the time we call `Shutdown`), it's
now safe to call Shutdown.  This matches the conventional go guidance on
channels: the writer should call close.

Kubernetes-commit: e90e67bd002e70a525d3ee9045b213a5d826074d
This commit is contained in:
Solly Ross
2020-10-16 15:05:00 -07:00
committed by Kubernetes Publisher
parent fc034b4b76
commit 726d27fe7a
2 changed files with 35 additions and 8 deletions

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"testing"
"time"
@@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
}
}
func TestNonRacyShutdown(t *testing.T) {
// Attempt to simulate previously racy conditions, and ensure that no race
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
// thread should be a safe operation, but it's not if we launch recorder.Action
// in a goroutine.
caster := NewBroadcasterForTests(0)
clock := clock.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock)
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah")
}()
}
wg.Wait()
caster.Shutdown()
}
func TestEventf(t *testing.T) {
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{