mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #95664 from DirectXMan12/bug/non-racy-recorder-shutdown
Don't spawn a goroutine for every event recording
This commit is contained in:
commit
9d99dbc357
@ -74,6 +74,22 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
|
||||||
|
// except that the incoming queue is the same size as the outgoing queues
|
||||||
|
// (specified by queueLength).
|
||||||
|
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
|
||||||
|
m := &Broadcaster{
|
||||||
|
watchers: map[int64]*broadcasterWatcher{},
|
||||||
|
incoming: make(chan Event, queueLength),
|
||||||
|
stopped: make(chan struct{}),
|
||||||
|
watchQueueLength: queueLength,
|
||||||
|
fullChannelBehavior: fullChannelBehavior,
|
||||||
|
}
|
||||||
|
m.distributing.Add(1)
|
||||||
|
go m.loop()
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
const internalRunFunctionMarker = "internal-do-function"
|
const internalRunFunctionMarker = "internal-do-function"
|
||||||
|
|
||||||
// a function type we can shoehorn into the queue.
|
// a function type we can shoehorn into the queue.
|
||||||
@ -198,6 +214,18 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
|
|||||||
m.incoming <- Event{action, obj}
|
m.incoming <- Event{action, obj}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Action distributes the given event among all watchers, or drops it on the floor
|
||||||
|
// if too many incoming actions are queued up. Returns true if the action was sent,
|
||||||
|
// false if dropped.
|
||||||
|
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
|
||||||
|
select {
|
||||||
|
case m.incoming <- Event{action, obj}:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Shutdown disconnects all watchers (but any queued events will still be distributed).
|
// Shutdown disconnects all watchers (but any queued events will still be distributed).
|
||||||
// You must not call Action or Watch* after calling Shutdown. This call blocks
|
// You must not call Action or Watch* after calling Shutdown. This call blocks
|
||||||
// until all events have been distributed through the outbound channels. Note
|
// until all events have been distributed through the outbound channels. Note
|
||||||
|
@ -155,21 +155,21 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
|
|||||||
// Creates a new event broadcaster.
|
// Creates a new event broadcaster.
|
||||||
func NewBroadcaster() EventBroadcaster {
|
func NewBroadcaster() EventBroadcaster {
|
||||||
return &eventBroadcasterImpl{
|
return &eventBroadcasterImpl{
|
||||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||||
sleepDuration: defaultSleepDuration,
|
sleepDuration: defaultSleepDuration,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
|
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
|
||||||
return &eventBroadcasterImpl{
|
return &eventBroadcasterImpl{
|
||||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||||
sleepDuration: sleepDuration,
|
sleepDuration: sleepDuration,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
|
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
|
||||||
return &eventBroadcasterImpl{
|
return &eventBroadcasterImpl{
|
||||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||||
sleepDuration: defaultSleepDuration,
|
sleepDuration: defaultSleepDuration,
|
||||||
options: options,
|
options: options,
|
||||||
}
|
}
|
||||||
@ -338,11 +338,14 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
|
|||||||
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
|
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
|
||||||
event.Source = recorder.source
|
event.Source = recorder.source
|
||||||
|
|
||||||
go func() {
|
// NOTE: events should be a non-blocking operation, but we also need to not
|
||||||
// NOTE: events should be a non-blocking operation
|
// put this in a goroutine, otherwise we'll race to write to a closed channel
|
||||||
defer utilruntime.HandleCrash()
|
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
|
||||||
recorder.Action(watch.Added, event)
|
// and log an error if that happens (we've configured the broadcaster to drop
|
||||||
}()
|
// outgoing events anyway).
|
||||||
|
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
|
||||||
|
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
|
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNonRacyShutdown(t *testing.T) {
|
||||||
|
// Attempt to simulate previously racy conditions, and ensure that no race
|
||||||
|
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
|
||||||
|
// thread should be a safe operation, but it's not if we launch recorder.Action
|
||||||
|
// in a goroutine.
|
||||||
|
|
||||||
|
caster := NewBroadcasterForTests(0)
|
||||||
|
clock := clock.NewFakeClock(time.Now())
|
||||||
|
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(100)
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah")
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
caster.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
func TestEventf(t *testing.T) {
|
func TestEventf(t *testing.T) {
|
||||||
testPod := &v1.Pod{
|
testPod := &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Loading…
Reference in New Issue
Block a user