From f19a514ff52377ecf1273667bda774b35552fe81 Mon Sep 17 00:00:00 2001 From: Andrew Stoycos Date: Fri, 11 Feb 2022 14:50:19 -0500 Subject: [PATCH] Fix Panic Condition Currenlty an event recorder can send an event to a broadcaster that is already stopped, resulting in a panic. This ensures the broadcaster holds a lock while it is shutting down and then forces any senders to drop queued events following broadcaster shutdown. It also updates the Action, ActionOrDrop, Watch, and WatchWithPrefix functions to return an error in the case where data is sent on the closed bradcaster channel rather than panicing. Lastly it updates unit tests to ensure the fix works correctly fixes: https://github.com/kubernetes/kubernetes/issues/108518 Signed-off-by: Andrew Stoycos Kubernetes-commit: 6aa779f4ed3d3acdad2f2bf17fb27e11e23aabe4 --- tools/cache/testing/fake_controller_source.go | 4 ++-- tools/events/event_broadcaster.go | 10 +++++++++- tools/record/event.go | 12 ++++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/tools/cache/testing/fake_controller_source.go b/tools/cache/testing/fake_controller_source.go index 6151582c..90b6e5cf 100644 --- a/tools/cache/testing/fake_controller_source.go +++ b/tools/cache/testing/fake_controller_source.go @@ -275,11 +275,11 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac // clients). changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) } - return f.Broadcaster.WatchWithPrefix(changes), nil + return f.Broadcaster.WatchWithPrefix(changes) } else if rc > f.lastRV { return nil, errors.New("resource version in the future not supported by this fake") } - return f.Broadcaster.Watch(), nil + return f.Broadcaster.Watch() } // Shutdown closes the underlying broadcaster, waiting for events to be diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index 3dd3cea1..dd7e0aa1 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -307,7 +307,15 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // The return value is used to stop recording func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() { - watcher := e.Watch() + watcher, err := e.Watch() + if err != nil { + klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) + // TODO: Rewrite the function signature to return an error, for + // now just return a no-op function + return func() { + klog.Error("The event watcher failed to start") + } + } go func() { defer utilruntime.HandleCrash() for { diff --git a/tools/record/event.go b/tools/record/event.go index b901d2e8..a01f3288 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -298,7 +298,10 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // The return value can be ignored or used to stop recording, if desired. func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { - watcher := e.Watch() + watcher, err := e.Watch() + if err != nil { + klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) + } go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { @@ -346,7 +349,12 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m // when we go to shut down this broadcaster. Just drop events if we get overloaded, // and log an error if that happens (we've configured the broadcaster to drop // outgoing events anyway). - if sent := recorder.ActionOrDrop(watch.Added, event); !sent { + sent, err := recorder.ActionOrDrop(watch.Added, event) + if err != nil { + klog.Errorf("unable to record event: %v (will not retry!)", err) + return + } + if !sent { klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) } }