k8s.io/client-go/tools: support context for event recording

Using StartRecordingToSinkWithContext instead of StartRecordingToSink and
StartLogging instead of StartStructuredLogging has several advantages:

- Spawned goroutines no longer get stuck for extended periods of
  time during shutdown when passing in a context that gets canceled.
- Log output can be directed towards a specific logger instead of the global
  default, for example one which writes to a testing.T instance.
- The new methods return an error when something went wrong instead of
  merely recording the error.

That last point is the reason for deprecating the old methods instead of merely
adding new alternatives.

Setting a context when constructing an EventBroadcaster makes calling Shutdown
optional. It can also be used to specify the logger.

Both EventRecorder interfaces in tools/events and tools/record now have a
WithLogger helper. Using that method is optional, but recommended to support
contextual logging properly. Without it, errors that occur while emitting an
event are not associated with the caller.

Kubernetes-commit: 27a68aee3a48340f7c14235f7fc24aa69aaeb8f6
This commit is contained in:
Patrick Ohly
2023-09-18 09:13:22 +02:00
committed by Kubernetes Publisher
parent ca718630c6
commit 3595e5242a
10 changed files with 338 additions and 122 deletions

View File

@@ -29,6 +29,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
internalevents "k8s.io/client-go/tools/internal/events"
"k8s.io/client-go/tools/record/util"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
@@ -110,6 +111,21 @@ type EventRecorder interface {
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
// EventRecorderLogger extends EventRecorder such that a logger can
// be set for methods in EventRecorder. Normally, those methods
// uses the global default logger to record errors and debug messages.
// If that is not desired, use WithLogger to provide a logger instance.
type EventRecorderLogger interface {
EventRecorder
// WithLogger replaces the context used for logging. This is a cheap call
// and meant to be used for contextual logging:
// recorder := ...
// logger := klog.FromContext(ctx)
// recorder.WithLogger(logger).Eventf(...)
WithLogger(logger klog.Logger) EventRecorderLogger
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
@@ -131,7 +147,7 @@ type EventBroadcaster interface {
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger
// Shutdown shuts down the broadcaster. Once the broadcaster is shut
// down, it will only try to record an event in a sink once before
@@ -142,12 +158,14 @@ type EventBroadcaster interface {
// EventRecorderAdapter is a wrapper around a "k8s.io/client-go/tools/record".EventRecorder
// implementing the new "k8s.io/client-go/tools/events".EventRecorder interface.
type EventRecorderAdapter struct {
recorder EventRecorder
recorder EventRecorderLogger
}
var _ internalevents.EventRecorder = &EventRecorderAdapter{}
// NewEventRecorderAdapter returns an adapter implementing the new
// "k8s.io/client-go/tools/events".EventRecorder interface.
func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter {
func NewEventRecorderAdapter(recorder EventRecorderLogger) *EventRecorderAdapter {
return &EventRecorderAdapter{
recorder: recorder,
}
@@ -158,28 +176,76 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
a.recorder.Eventf(regarding, eventtype, reason, note, args...)
}
func (a *EventRecorderAdapter) WithLogger(logger klog.Logger) internalevents.EventRecorderLogger {
return &EventRecorderAdapter{
recorder: a.recorder.WithLogger(logger),
}
}
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
c := config{
sleepDuration: defaultSleepDuration,
}
for _, opt := range opts {
opt(&c)
}
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: c.sleepDuration,
options: c.CorrelatorOptions,
}
ctx := c.Context
if ctx == nil {
ctx = context.Background()
} else {
// Calling Shutdown is not required when a context was provided:
// when the context is canceled, this goroutine will shut down
// the broadcaster.
go func() {
<-ctx.Done()
eventBroadcaster.Broadcaster.Shutdown()
}()
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
return eventBroadcaster
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration)
return NewBroadcaster(WithSleepDuration(sleepDuration))
}
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration)
eventBroadcaster.options = options
return eventBroadcaster
return NewBroadcaster(WithCorrelatorOptions(options))
}
func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl {
eventBroadcaster := &eventBroadcasterImpl{
Broadcaster: broadcaster,
sleepDuration: sleepDuration,
func WithCorrelatorOptions(options CorrelatorOptions) BroadcasterOption {
return func(c *config) {
c.CorrelatorOptions = options
}
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background())
return eventBroadcaster
}
// WithContext sets a context for the broadcaster. Canceling the context will
// shut down the broadcaster, Shutdown doesn't need to be called. The context
// can also be used to provide a logger.
func WithContext(ctx context.Context) BroadcasterOption {
return func(c *config) {
c.Context = ctx
}
}
func WithSleepDuration(sleepDuration time.Duration) BroadcasterOption {
return func(c *config) {
c.sleepDuration = sleepDuration
}
}
type BroadcasterOption func(*config)
type config struct {
CorrelatorOptions
context.Context
sleepDuration time.Duration
}
type eventBroadcasterImpl struct {
@@ -220,12 +286,12 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
if recordEvent(e.cancelationCtx, sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
break
}
@@ -237,7 +303,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
}
select {
case <-e.cancelationCtx.Done():
klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event)
klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (broadcaster is shut down)", "event", event)
return
case <-time.After(delay):
}
@@ -248,7 +314,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
func recordEvent(ctx context.Context, sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *v1.Event
var err error
if updateExistingEvent {
@@ -271,13 +337,13 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
}
return true
case *errors.UnexpectedObjectError:
@@ -286,7 +352,7 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)", "event", event)
return false
}
@@ -299,12 +365,15 @@ func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...int
})
}
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// StartStructuredLogging starts sending events received from this EventBroadcaster to a structured logger.
// The logger is retrieved from a context if the broadcaster was constructed with a context, otherwise
// the global default is used.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
loggerV := klog.FromContext(e.cancelationCtx).V(int(verbosity))
return e.StartEventWatcher(
func(e *v1.Event) {
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
})
}
@@ -313,26 +382,32 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
}
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
for {
select {
case <-e.cancelationCtx.Done():
watcher.Stop()
return
case watchEvent := <-watcher.ResultChan():
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
eventHandler(event)
}
}()
return watcher
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
}
type recorderImpl struct {
@@ -342,15 +417,17 @@ type recorderImpl struct {
clock clock.PassiveClock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
var _ EventRecorder = &recorderImpl{}
func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
return
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
return
}
@@ -367,16 +444,16 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
// outgoing events anyway).
sent, err := recorder.ActionOrDrop(watch.Added, event)
if err != nil {
klog.Errorf("unable to record event: %v (will not retry!)", err)
logger.Error(err, "Unable to record event (will not retry!)")
return
}
if !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
}
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, eventtype, reason, message)
recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
@@ -384,7 +461,7 @@ func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, m
}
func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
recorder.generateEvent(klog.Background(), object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
@@ -408,3 +485,26 @@ func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map
Type: eventtype,
}
}
type recorderImplLogger struct {
*recorderImpl
logger klog.Logger
}
var _ EventRecorderLogger = &recorderImplLogger{}
func (recorder recorderImplLogger) Event(object runtime.Object, eventtype, reason, message string) {
recorder.recorderImpl.generateEvent(recorder.logger, object, nil, eventtype, reason, message)
}
func (recorder recorderImplLogger) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder recorderImplLogger) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(recorder.logger, object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (recorder recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
return recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
}