From f6a5a1f1391b9b6ceaaa499bd7cebf508d5e02a6 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 3 Feb 2023 14:52:20 +0100 Subject: [PATCH] client-go: don't wait too long after EventBroadcaster.Shutdown When Shutdown was called, delivery of each pending event would still be retried 12 times with a delay of ~10s between each retry. In apiserver integration tests that caused the goroutine to linger long after the corresponding apiserver of the test was shut down. Kubernetes-commit: 15b01af9c18a0840d71e2bb7dff4d8c29b158aad --- tools/record/event.go | 53 ++++++++++++++++++++++++-------------- tools/record/event_test.go | 42 ++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 22 deletions(-) diff --git a/tools/record/event.go b/tools/record/event.go index 998bf8df..4899b362 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -17,6 +17,7 @@ limitations under the License. package record import ( + "context" "fmt" "math/rand" "time" @@ -132,7 +133,9 @@ type EventBroadcaster interface { // with the event source set to the given event source. NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder - // Shutdown shuts down the broadcaster + // Shutdown shuts down the broadcaster. Once the broadcaster is shut + // down, it will only try to record an event in a sink once before + // giving up on it with an error message. Shutdown() } @@ -157,31 +160,34 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re // Creates a new event broadcaster. func NewBroadcaster() EventBroadcaster { - return &eventBroadcasterImpl{ - Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), - sleepDuration: defaultSleepDuration, - } + return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration) } func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { - return &eventBroadcasterImpl{ - Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), - sleepDuration: sleepDuration, - } + return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration) } func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { - return &eventBroadcasterImpl{ - Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), - sleepDuration: defaultSleepDuration, - options: options, + eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration) + eventBroadcaster.options = options + return eventBroadcaster +} + +func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl { + eventBroadcaster := &eventBroadcasterImpl{ + Broadcaster: broadcaster, + sleepDuration: sleepDuration, } + eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background()) + return eventBroadcaster } type eventBroadcasterImpl struct { *watch.Broadcaster - sleepDuration time.Duration - options CorrelatorOptions + sleepDuration time.Duration + options CorrelatorOptions + cancelationCtx context.Context + cancel func() } // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. @@ -191,15 +197,16 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interf eventCorrelator := NewEventCorrelatorWithOptions(e.options) return e.StartEventWatcher( func(event *v1.Event) { - recordToSink(sink, event, eventCorrelator, e.sleepDuration) + e.recordToSink(sink, event, eventCorrelator) }) } func (e *eventBroadcasterImpl) Shutdown() { e.Broadcaster.Shutdown() + e.cancel() } -func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { +func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event @@ -221,12 +228,18 @@ func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrela klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } + // Randomize the first sleep so that various clients won't all be // synced up if the master goes down. + delay := e.sleepDuration if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) - } else { - time.Sleep(sleepDuration) + delay = time.Duration(float64(delay) * rand.Float64()) + } + select { + case <-e.cancelationCtx.Done(): + klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event) + return + case <-time.After(delay): } } } diff --git a/tools/record/event_test.go b/tools/record/event_test.go index 54435bca..2513638f 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -17,6 +17,7 @@ limitations under the License. package record import ( + "context" "encoding/json" "fmt" "net/http" @@ -452,7 +453,12 @@ func TestWriteEventError(t *testing.T) { }, } ev := &v1.Event{} - recordToSink(sink, ev, eventCorrelator, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + e := eventBroadcasterImpl{ + cancelationCtx: ctx, + } + e.recordToSink(sink, ev, eventCorrelator) if attempts != ent.attemptsWanted { t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) } @@ -482,7 +488,12 @@ func TestUpdateExpiredEvent(t *testing.T) { ev := &v1.Event{} ev.ResourceVersion = "updated-resource-version" ev.Count = 2 - recordToSink(sink, ev, eventCorrelator, 0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + e := eventBroadcasterImpl{ + cancelationCtx: ctx, + } + e.recordToSink(sink, ev, eventCorrelator) if createdEvent == nil { t.Error("Event did not get created after patch failed") @@ -494,6 +505,33 @@ func TestUpdateExpiredEvent(t *testing.T) { } } +func TestCancelEvent(t *testing.T) { + clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second} + eventCorrelator := NewEventCorrelator(&clock) + + attempts := 0 + sink := &testEventSink{ + OnCreate: func(event *v1.Event) (*v1.Event, error) { + attempts++ + return nil, &errors.UnexpectedObjectError{} + }, + } + + ev := &v1.Event{} + + // Cancel before even calling recordToSink. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + e := eventBroadcasterImpl{ + cancelationCtx: ctx, + sleepDuration: time.Second, + } + e.recordToSink(sink, ev, eventCorrelator) + if attempts != 1 { + t.Errorf("recordToSink should have tried once, then given up immediately. Instead it tried %d times.", attempts) + } +} + func TestLotsOfEvents(t *testing.T) { recorderCalled := make(chan struct{}) loggerCalled := make(chan struct{})