diff --git a/tools/cache/testing/fake_controller_source.go b/tools/cache/testing/fake_controller_source.go index 6151582c..90b6e5cf 100644 --- a/tools/cache/testing/fake_controller_source.go +++ b/tools/cache/testing/fake_controller_source.go @@ -275,11 +275,11 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac // clients). changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) } - return f.Broadcaster.WatchWithPrefix(changes), nil + return f.Broadcaster.WatchWithPrefix(changes) } else if rc > f.lastRV { return nil, errors.New("resource version in the future not supported by this fake") } - return f.Broadcaster.Watch(), nil + return f.Broadcaster.Watch() } // Shutdown closes the underlying broadcaster, waiting for events to be diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index 3dd3cea1..dd7e0aa1 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -307,7 +307,15 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // The return value is used to stop recording func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() { - watcher := e.Watch() + watcher, err := e.Watch() + if err != nil { + klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) + // TODO: Rewrite the function signature to return an error, for + // now just return a no-op function + return func() { + klog.Error("The event watcher failed to start") + } + } go func() { defer utilruntime.HandleCrash() for { diff --git a/tools/record/event.go b/tools/record/event.go index b901d2e8..a01f3288 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -298,7 +298,10 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // The return value can be ignored or used to stop recording, if desired. func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { - watcher := e.Watch() + watcher, err := e.Watch() + if err != nil { + klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) + } go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { @@ -346,7 +349,12 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m // when we go to shut down this broadcaster. Just drop events if we get overloaded, // and log an error if that happens (we've configured the broadcaster to drop // outgoing events anyway). - if sent := recorder.ActionOrDrop(watch.Added, event); !sent { + sent, err := recorder.ActionOrDrop(watch.Added, event) + if err != nil { + klog.Errorf("unable to record event: %v (will not retry!)", err) + return + } + if !sent { klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) } }