mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-01 15:17:19 +00:00
Fix Panic Condition
Currenlty an event recorder can send an event to a broadcaster that is already stopped, resulting in a panic. This ensures the broadcaster holds a lock while it is shutting down and then forces any senders to drop queued events following broadcaster shutdown. It also updates the Action, ActionOrDrop, Watch, and WatchWithPrefix functions to return an error in the case where data is sent on the closed bradcaster channel rather than panicing. Lastly it updates unit tests to ensure the fix works correctly fixes: https://github.com/kubernetes/kubernetes/issues/108518 Signed-off-by: Andrew Stoycos <astoycos@redhat.com> Kubernetes-commit: 6aa779f4ed3d3acdad2f2bf17fb27e11e23aabe4
This commit is contained in:
committed by
Kubernetes Publisher
parent
b4b8e5e1e5
commit
f19a514ff5
@@ -275,11 +275,11 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
|
|||||||
// clients).
|
// clients).
|
||||||
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
|
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
|
||||||
}
|
}
|
||||||
return f.Broadcaster.WatchWithPrefix(changes), nil
|
return f.Broadcaster.WatchWithPrefix(changes)
|
||||||
} else if rc > f.lastRV {
|
} else if rc > f.lastRV {
|
||||||
return nil, errors.New("resource version in the future not supported by this fake")
|
return nil, errors.New("resource version in the future not supported by this fake")
|
||||||
}
|
}
|
||||||
return f.Broadcaster.Watch(), nil
|
return f.Broadcaster.Watch()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown closes the underlying broadcaster, waiting for events to be
|
// Shutdown closes the underlying broadcaster, waiting for events to be
|
||||||
|
@@ -307,7 +307,15 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func
|
|||||||
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||||
// The return value is used to stop recording
|
// The return value is used to stop recording
|
||||||
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
|
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
|
||||||
watcher := e.Watch()
|
watcher, err := e.Watch()
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||||
|
// TODO: Rewrite the function signature to return an error, for
|
||||||
|
// now just return a no-op function
|
||||||
|
return func() {
|
||||||
|
klog.Error("The event watcher failed to start")
|
||||||
|
}
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
for {
|
for {
|
||||||
|
@@ -298,7 +298,10 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
|
|||||||
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||||
// The return value can be ignored or used to stop recording, if desired.
|
// The return value can be ignored or used to stop recording, if desired.
|
||||||
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
|
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
|
||||||
watcher := e.Watch()
|
watcher, err := e.Watch()
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
for watchEvent := range watcher.ResultChan() {
|
for watchEvent := range watcher.ResultChan() {
|
||||||
@@ -346,7 +349,12 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
|
|||||||
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
|
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
|
||||||
// and log an error if that happens (we've configured the broadcaster to drop
|
// and log an error if that happens (we've configured the broadcaster to drop
|
||||||
// outgoing events anyway).
|
// outgoing events anyway).
|
||||||
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
|
sent, err := recorder.ActionOrDrop(watch.Added, event)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("unable to record event: %v (will not retry!)", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !sent {
|
||||||
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
|
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user