diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 658f69c7..0c27891a 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -464,7 +464,7 @@ }, { "ImportPath": "k8s.io/apimachinery", - "Rev": "0e9f9cff521d" + "Rev": "460d10991a52" }, { "ImportPath": "k8s.io/gengo", diff --git a/go.mod b/go.mod index 4238f406..646c7aa1 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e k8s.io/api v0.0.0-20210115125903-c873f2e8ab25 - k8s.io/apimachinery v0.0.0-20210116045657-0e9f9cff521d + k8s.io/apimachinery v0.0.0-20210121071119-460d10991a52 k8s.io/klog/v2 v2.4.0 k8s.io/utils v0.0.0-20201110183641-67b214c5f920 sigs.k8s.io/yaml v1.2.0 @@ -35,5 +35,5 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20210115125903-c873f2e8ab25 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210116045657-0e9f9cff521d + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210121071119-460d10991a52 ) diff --git a/go.sum b/go.sum index e56a370e..0552341f 100644 --- a/go.sum +++ b/go.sum @@ -428,7 +428,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20210115125903-c873f2e8ab25/go.mod h1:xpUvIW3IJYnKO2yMuT9r4zCZI1ppqiuEejNFI9eoqWo= -k8s.io/apimachinery v0.0.0-20210116045657-0e9f9cff521d/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= +k8s.io/apimachinery v0.0.0-20210121071119-460d10991a52/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ= diff --git a/tools/record/event.go b/tools/record/event.go index 0bf20c1d..30a66601 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -155,21 +155,21 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re // Creates a new event broadcaster. func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } } func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: sleepDuration, } } func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { return &eventBroadcasterImpl{ - Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), + Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, options: options, } @@ -338,11 +338,14 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source - go func() { - // NOTE: events should be a non-blocking operation - defer utilruntime.HandleCrash() - recorder.Action(watch.Added, event) - }() + // NOTE: events should be a non-blocking operation, but we also need to not + // put this in a goroutine, otherwise we'll race to write to a closed channel + // when we go to shut down this broadcaster. Just drop events if we get overloaded, + // and log an error if that happens (we've configured the broadcaster to drop + // outgoing events anyway). + if sent := recorder.ActionOrDrop(watch.Added, event); !sent { + klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) + } } func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { diff --git a/tools/record/event_test.go b/tools/record/event_test.go index 67b033d5..60182bb3 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "sync" "testing" "time" @@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) } } +func TestNonRacyShutdown(t *testing.T) { + // Attempt to simulate previously racy conditions, and ensure that no race + // occurs: Nominally, calling "Eventf" *followed by* shutdown from the same + // thread should be a safe operation, but it's not if we launch recorder.Action + // in a goroutine. + + caster := NewBroadcasterForTests(0) + clock := clock.NewFakeClock(time.Now()) + recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock) + + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah") + }() + } + + wg.Wait() + caster.Shutdown() +} + func TestEventf(t *testing.T) { testPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{