client-go: don't wait too long after EventBroadcaster.Shutdown

When Shutdown was called, delivery of each pending event would still be retried
12 times with a delay of ~10s between each retry. In apiserver integration
tests that caused the goroutine to linger long after the corresponding
apiserver of the test was shut down.

Kubernetes-commit: 15b01af9c18a0840d71e2bb7dff4d8c29b158aad
This commit is contained in:
Patrick Ohly 2023-02-03 14:52:20 +01:00 committed by Kubernetes Publisher
parent 00b9d76f44
commit f6a5a1f139
2 changed files with 73 additions and 22 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package record package record
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"time" "time"
@ -132,7 +133,9 @@ type EventBroadcaster interface {
// with the event source set to the given event source. // with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
// Shutdown shuts down the broadcaster // Shutdown shuts down the broadcaster. Once the broadcaster is shut
// down, it will only try to record an event in a sink once before
// giving up on it with an error message.
Shutdown() Shutdown()
} }
@ -157,31 +160,34 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
// Creates a new event broadcaster. // Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster { func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{ return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
} }
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return &eventBroadcasterImpl{ return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration)
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: sleepDuration,
}
} }
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
return &eventBroadcasterImpl{ eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), eventBroadcaster.options = options
sleepDuration: defaultSleepDuration, return eventBroadcaster
options: options,
} }
func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl {
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: broadcaster,
sleepDuration: sleepDuration,
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background())
return eventBroadcaster
} }
type eventBroadcasterImpl struct { type eventBroadcasterImpl struct {
*watch.Broadcaster *watch.Broadcaster
sleepDuration time.Duration sleepDuration time.Duration
options CorrelatorOptions options CorrelatorOptions
cancelationCtx context.Context
cancel func()
} }
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
@ -191,15 +197,16 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interf
eventCorrelator := NewEventCorrelatorWithOptions(e.options) eventCorrelator := NewEventCorrelatorWithOptions(e.options)
return e.StartEventWatcher( return e.StartEventWatcher(
func(event *v1.Event) { func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, e.sleepDuration) e.recordToSink(sink, event, eventCorrelator)
}) })
} }
func (e *eventBroadcasterImpl) Shutdown() { func (e *eventBroadcasterImpl) Shutdown() {
e.Broadcaster.Shutdown() e.Broadcaster.Shutdown()
e.cancel()
} }
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator) {
// Make a copy before modification, because there could be multiple listeners. // Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this. // Events are safe to copy like this.
eventCopy := *event eventCopy := *event
@ -221,12 +228,18 @@ func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrela
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break break
} }
// Randomize the first sleep so that various clients won't all be // Randomize the first sleep so that various clients won't all be
// synced up if the master goes down. // synced up if the master goes down.
delay := e.sleepDuration
if tries == 1 { if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) delay = time.Duration(float64(delay) * rand.Float64())
} else { }
time.Sleep(sleepDuration) select {
case <-e.cancelationCtx.Done():
klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event)
return
case <-time.After(delay):
} }
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package record package record
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -452,7 +453,12 @@ func TestWriteEventError(t *testing.T) {
}, },
} }
ev := &v1.Event{} ev := &v1.Event{}
recordToSink(sink, ev, eventCorrelator, 0) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
e := eventBroadcasterImpl{
cancelationCtx: ctx,
}
e.recordToSink(sink, ev, eventCorrelator)
if attempts != ent.attemptsWanted { if attempts != ent.attemptsWanted {
t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts)
} }
@ -482,7 +488,12 @@ func TestUpdateExpiredEvent(t *testing.T) {
ev := &v1.Event{} ev := &v1.Event{}
ev.ResourceVersion = "updated-resource-version" ev.ResourceVersion = "updated-resource-version"
ev.Count = 2 ev.Count = 2
recordToSink(sink, ev, eventCorrelator, 0) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
e := eventBroadcasterImpl{
cancelationCtx: ctx,
}
e.recordToSink(sink, ev, eventCorrelator)
if createdEvent == nil { if createdEvent == nil {
t.Error("Event did not get created after patch failed") t.Error("Event did not get created after patch failed")
@ -494,6 +505,33 @@ func TestUpdateExpiredEvent(t *testing.T) {
} }
} }
func TestCancelEvent(t *testing.T) {
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)
attempts := 0
sink := &testEventSink{
OnCreate: func(event *v1.Event) (*v1.Event, error) {
attempts++
return nil, &errors.UnexpectedObjectError{}
},
}
ev := &v1.Event{}
// Cancel before even calling recordToSink.
ctx, cancel := context.WithCancel(context.Background())
cancel()
e := eventBroadcasterImpl{
cancelationCtx: ctx,
sleepDuration: time.Second,
}
e.recordToSink(sink, ev, eventCorrelator)
if attempts != 1 {
t.Errorf("recordToSink should have tried once, then given up immediately. Instead it tried %d times.", attempts)
}
}
func TestLotsOfEvents(t *testing.T) { func TestLotsOfEvents(t *testing.T) {
recorderCalled := make(chan struct{}) recorderCalled := make(chan struct{})
loggerCalled := make(chan struct{}) loggerCalled := make(chan struct{})