Fix Panic Condition

Currenlty an event recorder can send an event to a
broadcaster that is already stopped, resulting
in a panic.  This ensures the broadcaster holds
a lock while it is shutting down and then forces
any senders to drop queued events following
broadcaster shutdown.

It also updates the Action, ActionOrDrop,  Watch,
and WatchWithPrefix functions to return an error
in the case where data is sent on the closed bradcaster
channel rather than panicing.

Lastly it updates unit tests to ensure the fix works correctly

fixes: https://github.com/kubernetes/kubernetes/issues/108518

Signed-off-by: Andrew Stoycos <astoycos@redhat.com>
This commit is contained in:
Andrew Stoycos 2022-02-11 14:50:19 -05:00
parent 4717a59097
commit 6aa779f4ed
5 changed files with 113 additions and 54 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package watch
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/runtime"
@ -44,8 +45,11 @@ type Broadcaster struct {
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
stopped chan struct{}
// incomingBlock allows us to ensure we don't race and end up sending events
// to a closed channel following a brodcaster shutdown.
incomingBlock sync.Mutex
incoming chan Event
stopped chan struct{}
// How large to make watcher's channel.
watchQueueLength int
@ -132,7 +136,7 @@ func (m *Broadcaster) blockQueue(f func()) {
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events. It will block until the watcher is actually added to the
// broadcaster.
func (m *Broadcaster) Watch() Interface {
func (m *Broadcaster) Watch() (Interface, error) {
var w *broadcasterWatcher
m.blockQueue(func() {
id := m.nextWatcher
@ -146,11 +150,9 @@ func (m *Broadcaster) Watch() Interface {
m.watchers[id] = w
})
if w == nil {
// The panic here is to be consistent with the previous interface behavior
// we are willing to re-evaluate in the future.
panic("broadcaster already stopped")
return nil, fmt.Errorf("broadcaster already stopped")
}
return w
return w, nil
}
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
@ -158,7 +160,7 @@ func (m *Broadcaster) Watch() Interface {
// The returned watch will have a queue length that is at least large enough to accommodate
// all of the items in queuedEvents. It will block until the watcher is actually added to
// the broadcaster.
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) (Interface, error) {
var w *broadcasterWatcher
m.blockQueue(func() {
id := m.nextWatcher
@ -179,11 +181,9 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
}
})
if w == nil {
// The panic here is to be consistent with the previous interface behavior
// we are willing to re-evaluate in the future.
panic("broadcaster already stopped")
return nil, fmt.Errorf("broadcaster already stopped")
}
return w
return w, nil
}
// stopWatching stops the given watcher and removes it from the list.
@ -210,19 +210,38 @@ func (m *Broadcaster) closeAll() {
}
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
func (m *Broadcaster) Action(action EventType, obj runtime.Object) error {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
select {
case <-m.stopped:
return fmt.Errorf("broadcaster already stopped")
default:
}
m.incoming <- Event{action, obj}
return nil
}
// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up. Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
// Ensure that if the broadcaster is stopped we do not send events to it.
select {
case <-m.stopped:
return false, fmt.Errorf("broadcaster already stopped")
default:
}
select {
case m.incoming <- Event{action, obj}:
return true
return true, nil
default:
return false
return false, nil
}
}
@ -233,6 +252,8 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
// have received the data yet as it can remain sitting in the buffered
// channel. It will block until the broadcaster stop request is actually executed
func (m *Broadcaster) Shutdown() {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
m.blockQueue(func() {
close(m.stopped)
close(m.incoming)

View File

@ -22,6 +22,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
@ -58,6 +59,10 @@ func TestBroadcaster(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(testWatchers)
for i := 0; i < testWatchers; i++ {
w, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
// Verify that each watcher gets the events in the correct order
go func(watcher int, w Interface) {
tableLine := 0
@ -75,7 +80,7 @@ func TestBroadcaster(t *testing.T) {
tableLine++
}
wg.Done()
}(i, m.Watch())
}(i, w)
}
for i, item := range table {
@ -90,8 +95,14 @@ func TestBroadcaster(t *testing.T) {
func TestBroadcasterWatcherClose(t *testing.T) {
m := NewBroadcaster(0, WaitIfChannelFull)
w := m.Watch()
w2 := m.Watch()
w, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
w2, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
w.Stop()
m.Shutdown()
if _, open := <-w.ResultChan(); open {
@ -108,6 +119,14 @@ func TestBroadcasterWatcherClose(t *testing.T) {
func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
done := make(chan bool)
m := NewBroadcaster(0, WaitIfChannelFull)
w, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
w2, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
go func(w0, w1 Interface) {
// We know Broadcaster is in the distribute loop once one watcher receives
// an event. Stop the other watcher while distribute is trying to
@ -119,7 +138,7 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
w0.Stop()
}
close(done)
}(m.Watch(), m.Watch())
}(w, w2)
m.Action(Added, &myType{})
select {
case <-time.After(wait.ForeverTestTimeout):
@ -137,8 +156,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
// Add a couple watchers
watches := make([]Interface, 2)
var err error
for i := range watches {
watches[i] = m.Watch()
watches[i], err = m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
}
// Send a couple events before closing the broadcast channel.
@ -194,33 +217,32 @@ func TestBroadcasterWatchAfterShutdown(t *testing.T) {
m := NewBroadcaster(0, WaitIfChannelFull)
m.Shutdown()
watch := func() {
defer func() {
if err := recover(); err == nil {
t.Error("should cause panic")
}
}()
m.Watch()
}
watch()
_, err := m.Watch()
assert.EqualError(t, err, "broadcaster already stopped", "Watch should report error id broadcaster is shutdown")
watchWithPrefix := func() {
defer func() {
if err := recover(); err == nil {
t.Error("should cause panic")
}
}()
m.WatchWithPrefix([]Event{event1, event2})
}
watchWithPrefix()
action := func() {
defer func() {
if err := recover(); err == nil {
t.Error("should cause panic")
}
}()
m.Action(event1.Type, event1.Object)
}
action()
_, err = m.WatchWithPrefix([]Event{event1, event2})
assert.EqualError(t, err, "broadcaster already stopped", "WatchWithPrefix should report error id broadcaster is shutdown")
}
func TestBroadcasterSendEventAfterShutdown(t *testing.T) {
m := NewBroadcaster(1, DropIfChannelFull)
event := Event{Type: Added, Object: &myType{"foo", "hello world"}}
// Add a couple watchers
watches := make([]Interface, 2)
for i := range watches {
watches[i], _ = m.Watch()
}
m.Shutdown()
// Send a couple events after closing the broadcast channel.
t.Log("Sending event")
err := m.Action(event.Type, event.Object)
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
sendOnClosed, err := m.ActionOrDrop(event.Type, event.Object)
assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown")
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
}

View File

@ -275,11 +275,11 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
// clients).
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
}
return f.Broadcaster.WatchWithPrefix(changes), nil
return f.Broadcaster.WatchWithPrefix(changes)
} else if rc > f.lastRV {
return nil, errors.New("resource version in the future not supported by this fake")
}
return f.Broadcaster.Watch(), nil
return f.Broadcaster.Watch()
}
// Shutdown closes the underlying broadcaster, waiting for events to be

View File

@ -307,7 +307,15 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
watcher := e.Watch()
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
// TODO: Rewrite the function signature to return an error, for
// now just return a no-op function
return func() {
klog.Error("The event watcher failed to start")
}
}
go func() {
defer utilruntime.HandleCrash()
for {

View File

@ -298,7 +298,10 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := e.Watch()
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
}
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
@ -346,7 +349,12 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
// and log an error if that happens (we've configured the broadcaster to drop
// outgoing events anyway).
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
sent, err := recorder.ActionOrDrop(watch.Added, event)
if err != nil {
klog.Errorf("unable to record event: %v (will not retry!)", err)
return
}
if !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
}
}