diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go index e01d519060b..734ac1411f6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "fmt" "sync" "k8s.io/apimachinery/pkg/runtime" @@ -44,8 +45,11 @@ type Broadcaster struct { nextWatcher int64 distributing sync.WaitGroup - incoming chan Event - stopped chan struct{} + // incomingBlock allows us to ensure we don't race and end up sending events + // to a closed channel following a brodcaster shutdown. + incomingBlock sync.Mutex + incoming chan Event + stopped chan struct{} // How large to make watcher's channel. watchQueueLength int @@ -132,7 +136,7 @@ func (m *Broadcaster) blockQueue(f func()) { // Note: new watchers will only receive new events. They won't get an entire history // of previous events. It will block until the watcher is actually added to the // broadcaster. -func (m *Broadcaster) Watch() Interface { +func (m *Broadcaster) Watch() (Interface, error) { var w *broadcasterWatcher m.blockQueue(func() { id := m.nextWatcher @@ -146,11 +150,9 @@ func (m *Broadcaster) Watch() Interface { m.watchers[id] = w }) if w == nil { - // The panic here is to be consistent with the previous interface behavior - // we are willing to re-evaluate in the future. - panic("broadcaster already stopped") + return nil, fmt.Errorf("broadcaster already stopped") } - return w + return w, nil } // WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends @@ -158,7 +160,7 @@ func (m *Broadcaster) Watch() Interface { // The returned watch will have a queue length that is at least large enough to accommodate // all of the items in queuedEvents. It will block until the watcher is actually added to // the broadcaster. -func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { +func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) (Interface, error) { var w *broadcasterWatcher m.blockQueue(func() { id := m.nextWatcher @@ -179,11 +181,9 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { } }) if w == nil { - // The panic here is to be consistent with the previous interface behavior - // we are willing to re-evaluate in the future. - panic("broadcaster already stopped") + return nil, fmt.Errorf("broadcaster already stopped") } - return w + return w, nil } // stopWatching stops the given watcher and removes it from the list. @@ -210,19 +210,38 @@ func (m *Broadcaster) closeAll() { } // Action distributes the given event among all watchers. -func (m *Broadcaster) Action(action EventType, obj runtime.Object) { +func (m *Broadcaster) Action(action EventType, obj runtime.Object) error { + m.incomingBlock.Lock() + defer m.incomingBlock.Unlock() + select { + case <-m.stopped: + return fmt.Errorf("broadcaster already stopped") + default: + } + m.incoming <- Event{action, obj} + return nil } // Action distributes the given event among all watchers, or drops it on the floor // if too many incoming actions are queued up. Returns true if the action was sent, // false if dropped. -func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool { +func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) { + m.incomingBlock.Lock() + defer m.incomingBlock.Unlock() + + // Ensure that if the broadcaster is stopped we do not send events to it. + select { + case <-m.stopped: + return false, fmt.Errorf("broadcaster already stopped") + default: + } + select { case m.incoming <- Event{action, obj}: - return true + return true, nil default: - return false + return false, nil } } @@ -233,6 +252,8 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool { // have received the data yet as it can remain sitting in the buffered // channel. It will block until the broadcaster stop request is actually executed func (m *Broadcaster) Shutdown() { + m.incomingBlock.Lock() + defer m.incomingBlock.Unlock() m.blockQueue(func() { close(m.stopped) close(m.incoming) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go index 544e571365c..dadec3e9bae 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -58,6 +59,10 @@ func TestBroadcaster(t *testing.T) { wg := sync.WaitGroup{} wg.Add(testWatchers) for i := 0; i < testWatchers; i++ { + w, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } // Verify that each watcher gets the events in the correct order go func(watcher int, w Interface) { tableLine := 0 @@ -75,7 +80,7 @@ func TestBroadcaster(t *testing.T) { tableLine++ } wg.Done() - }(i, m.Watch()) + }(i, w) } for i, item := range table { @@ -90,8 +95,14 @@ func TestBroadcaster(t *testing.T) { func TestBroadcasterWatcherClose(t *testing.T) { m := NewBroadcaster(0, WaitIfChannelFull) - w := m.Watch() - w2 := m.Watch() + w, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } + w2, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } w.Stop() m.Shutdown() if _, open := <-w.ResultChan(); open { @@ -108,6 +119,14 @@ func TestBroadcasterWatcherClose(t *testing.T) { func TestBroadcasterWatcherStopDeadlock(t *testing.T) { done := make(chan bool) m := NewBroadcaster(0, WaitIfChannelFull) + w, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } + w2, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } go func(w0, w1 Interface) { // We know Broadcaster is in the distribute loop once one watcher receives // an event. Stop the other watcher while distribute is trying to @@ -119,7 +138,7 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) { w0.Stop() } close(done) - }(m.Watch(), m.Watch()) + }(w, w2) m.Action(Added, &myType{}) select { case <-time.After(wait.ForeverTestTimeout): @@ -137,8 +156,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { // Add a couple watchers watches := make([]Interface, 2) + var err error for i := range watches { - watches[i] = m.Watch() + watches[i], err = m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } } // Send a couple events before closing the broadcast channel. @@ -194,33 +217,32 @@ func TestBroadcasterWatchAfterShutdown(t *testing.T) { m := NewBroadcaster(0, WaitIfChannelFull) m.Shutdown() - watch := func() { - defer func() { - if err := recover(); err == nil { - t.Error("should cause panic") - } - }() - m.Watch() - } - watch() + _, err := m.Watch() + assert.EqualError(t, err, "broadcaster already stopped", "Watch should report error id broadcaster is shutdown") - watchWithPrefix := func() { - defer func() { - if err := recover(); err == nil { - t.Error("should cause panic") - } - }() - m.WatchWithPrefix([]Event{event1, event2}) - } - watchWithPrefix() - - action := func() { - defer func() { - if err := recover(); err == nil { - t.Error("should cause panic") - } - }() - m.Action(event1.Type, event1.Object) - } - action() + _, err = m.WatchWithPrefix([]Event{event1, event2}) + assert.EqualError(t, err, "broadcaster already stopped", "WatchWithPrefix should report error id broadcaster is shutdown") +} + +func TestBroadcasterSendEventAfterShutdown(t *testing.T) { + m := NewBroadcaster(1, DropIfChannelFull) + + event := Event{Type: Added, Object: &myType{"foo", "hello world"}} + + // Add a couple watchers + watches := make([]Interface, 2) + for i := range watches { + watches[i], _ = m.Watch() + } + m.Shutdown() + + // Send a couple events after closing the broadcast channel. + t.Log("Sending event") + + err := m.Action(event.Type, event.Object) + assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown") + + sendOnClosed, err := m.ActionOrDrop(event.Type, event.Object) + assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown") + assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown") } diff --git a/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go b/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go index 6151582c8c4..90b6e5cf93c 100644 --- a/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go +++ b/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go @@ -275,11 +275,11 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac // clients). changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) } - return f.Broadcaster.WatchWithPrefix(changes), nil + return f.Broadcaster.WatchWithPrefix(changes) } else if rc > f.lastRV { return nil, errors.New("resource version in the future not supported by this fake") } - return f.Broadcaster.Watch(), nil + return f.Broadcaster.Watch() } // Shutdown closes the underlying broadcaster, waiting for events to be diff --git a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go index 3dd3cea1a9d..dd7e0aa12e2 100644 --- a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go +++ b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go @@ -307,7 +307,15 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // The return value is used to stop recording func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() { - watcher := e.Watch() + watcher, err := e.Watch() + if err != nil { + klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) + // TODO: Rewrite the function signature to return an error, for + // now just return a no-op function + return func() { + klog.Error("The event watcher failed to start") + } + } go func() { defer utilruntime.HandleCrash() for { diff --git a/staging/src/k8s.io/client-go/tools/record/event.go b/staging/src/k8s.io/client-go/tools/record/event.go index b901d2e8a0a..a01f3288b8e 100644 --- a/staging/src/k8s.io/client-go/tools/record/event.go +++ b/staging/src/k8s.io/client-go/tools/record/event.go @@ -298,7 +298,10 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // The return value can be ignored or used to stop recording, if desired. func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { - watcher := e.Watch() + watcher, err := e.Watch() + if err != nil { + klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) + } go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { @@ -346,7 +349,12 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m // when we go to shut down this broadcaster. Just drop events if we get overloaded, // and log an error if that happens (we've configured the broadcaster to drop // outgoing events anyway). - if sent := recorder.ActionOrDrop(watch.Added, event); !sent { + sent, err := recorder.ActionOrDrop(watch.Added, event) + if err != nil { + klog.Errorf("unable to record event: %v (will not retry!)", err) + return + } + if !sent { klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) } }