Merge pull request #95664 from DirectXMan12/bug/non-racy-recorder-shutdown

Don't spawn a goroutine for every event recording

Kubernetes-commit: 9d99dbc357ed85bcc963fc4bab8a4a3089c910b2
This commit is contained in:
Kubernetes Publisher 2021-01-20 21:25:00 -08:00
commit b72204b244
5 changed files with 39 additions and 12 deletions

2
Godeps/Godeps.json generated
View File

@ -464,7 +464,7 @@
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "0e9f9cff521d"
"Rev": "460d10991a52"
},
{
"ImportPath": "k8s.io/gengo",

4
go.mod
View File

@ -27,7 +27,7 @@ require (
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
k8s.io/api v0.0.0-20210115125903-c873f2e8ab25
k8s.io/apimachinery v0.0.0-20210116045657-0e9f9cff521d
k8s.io/apimachinery v0.0.0-20210121071119-460d10991a52
k8s.io/klog/v2 v2.4.0
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
sigs.k8s.io/yaml v1.2.0
@ -35,5 +35,5 @@ require (
replace (
k8s.io/api => k8s.io/api v0.0.0-20210115125903-c873f2e8ab25
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210116045657-0e9f9cff521d
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210121071119-460d10991a52
)

2
go.sum
View File

@ -428,7 +428,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210115125903-c873f2e8ab25/go.mod h1:xpUvIW3IJYnKO2yMuT9r4zCZI1ppqiuEejNFI9eoqWo=
k8s.io/apimachinery v0.0.0-20210116045657-0e9f9cff521d/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU=
k8s.io/apimachinery v0.0.0-20210121071119-460d10991a52/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=

View File

@ -155,21 +155,21 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: sleepDuration,
}
}
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
options: options,
}
@ -338,11 +338,14 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
// NOTE: events should be a non-blocking operation, but we also need to not
// put this in a goroutine, otherwise we'll race to write to a closed channel
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
// and log an error if that happens (we've configured the broadcaster to drop
// outgoing events anyway).
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
}
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"testing"
"time"
@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
}
}
func TestNonRacyShutdown(t *testing.T) {
// Attempt to simulate previously racy conditions, and ensure that no race
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
// thread should be a safe operation, but it's not if we launch recorder.Action
// in a goroutine.
caster := NewBroadcasterForTests(0)
clock := clock.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock)
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah")
}()
}
wg.Wait()
caster.Shutdown()
}
func TestEventf(t *testing.T) {
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{