From ad243edaa3bfcefbb69ab3ff42cb6fe39938599d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Sat, 22 Aug 2015 15:01:56 -0700 Subject: [PATCH] Fix race condition in watch The Shutdown() call returned immediately, without waiting for all event distributions to be completed. Even worse, it would close all the watcher result channels before all the info was sent to them. Properly wait for all distributor goroutines - currently only one - to be finished. This fixes the flaky test TestBroadcasterDropIfChannelFull. Bonus cleanup on said test too. --- pkg/watch/mux.go | 14 +++++++++++--- pkg/watch/mux_test.go | 13 +++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 5d1f71768de..ccae32264fb 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -41,8 +41,9 @@ const incomingQueueLength = 25 type Broadcaster struct { lock sync.Mutex - watchers map[int64]*broadcasterWatcher - nextWatcher int64 + watchers map[int64]*broadcasterWatcher + nextWatcher int64 + distributing sync.WaitGroup incoming chan Event @@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } + m.distributing.Add(1) go m.loop() return m } @@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { } // Shutdown disconnects all watchers (but any queued events will still be distributed). -// You must not call Action after calling Shutdown. +// You must not call Action or Watch* after calling Shutdown. This call blocks +// until all events have been distributed through the outbound channels. Note +// that since they can be buffered, this means that the watchers might not +// have received the data yet as it can remain sitting in the buffered +// channel. func (m *Broadcaster) Shutdown() { close(m.incoming) + m.distributing.Wait() } // loop receives from m.incoming and distributes to all watchers. @@ -163,6 +170,7 @@ func (m *Broadcaster) loop() { m.distribute(event) } m.closeAll() + m.distributing.Done() } // distribute sends event to all watchers. Blocking. diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index fd31910060c..d3e48279cc6 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { event2 := Event{Added, &myType{"bar", "hello world 2"}} // Add a couple watchers - const testWatchers = 2 - watches := make([]Interface, testWatchers) - for i := 0; i < testWatchers; i++ { + watches := make([]Interface, 2) + for i := range watches { watches[i] = m.Watch() } @@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { // Pull events from the queue. wg := sync.WaitGroup{} - wg.Add(testWatchers) - for i := 0; i < testWatchers; i++ { + wg.Add(len(watches)) + for i := range watches { // Verify that each watcher only gets the first event because its watch // queue of length one was full from the first one. go func(watcher int, w Interface) { @@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { e1, ok := <-w.ResultChan() if !ok { t.Errorf("Watcher %v failed to retrieve first event.", watcher) - return } if e, a := event1, e1; !reflect.DeepEqual(e, a) { t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", watcher, e.Type, e.Object, a.Type, a.Object) - } else { - t.Logf("Got (%v, %#v)", e1.Type, e1.Object) } + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) e2, ok := <-w.ResultChan() if ok { t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",