From 435e0b73bb99862f9dedf56a50260ff3dfef14ff Mon Sep 17 00:00:00 2001 From: Andrew M Bursavich Date: Mon, 15 Sep 2014 12:07:32 -0700 Subject: [PATCH 1/2] pkg/watch: fix potential deadlock --- pkg/watch/mux.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 0c87111ee16..2d40014fb5e 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface { id := m.nextWatcher m.nextWatcher++ w := &muxWatcher{ - result: make(chan Event), - id: id, - m: m, + result: make(chan Event), + stopped: make(chan struct{}), + id: id, + m: m, } m.watchers[id] = w return w @@ -119,15 +120,20 @@ func (m *Mux) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() for _, w := range m.watchers { - w.result <- event + select { + case w.result <- event: + case <-w.stopped: + } } } // muxWatcher handles a single watcher of a mux type muxWatcher struct { - result chan Event - id int64 - m *Mux + result chan Event + stopped chan struct{} + stop sync.Once + id int64 + m *Mux } // ResultChan returns a channel to use for waiting on events. @@ -137,5 +143,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event { // Stop stops watching and removes mw from its list. func (mw *muxWatcher) Stop() { - mw.m.stopWatching(mw.id) + mw.stop.Do(func() { + close(mw.stopped) + mw.m.stopWatching(mw.id) + }) } From b9dcfbad761db2ad9ae27847f6f794286872b0c7 Mon Sep 17 00:00:00 2001 From: Andrew M Bursavich Date: Mon, 15 Sep 2014 16:37:52 -0700 Subject: [PATCH 2/2] pkg/watch: deadlock test --- pkg/watch/mux.go | 3 +++ pkg/watch/mux_test.go | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 2d40014fb5e..5f3bd5b7b66 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -115,10 +115,13 @@ func (m *Mux) loop() { m.closeAll() } +var testHookMuxDistribute = func() {} + // distribute sends event to all watchers. Blocking. func (m *Mux) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() + testHookMuxDistribute() for _, w := range m.watchers { select { case w.result <- event: diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index a9433a4bea8..679261fc4b2 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sync" "testing" + "time" ) type myType struct { @@ -91,3 +92,24 @@ func TestMuxWatcherClose(t *testing.T) { w.Stop() w2.Stop() } + +func TestMuxWatcherStopDeadlock(t *testing.T) { + defer func(fn func()) { testHookMuxDistribute = fn }(testHookMuxDistribute) + sig, done := make(chan bool), make(chan bool) + testHookMuxDistribute = func() { sig <- true } + m := NewMux(0) + go func(w Interface) { + // Imagine this goroutine was receiving from w.ResultChan() + // until it received some signal and stopped watching. + <-sig + w.Stop() + close(done) + }(m.Watch()) + m.Action(Added, &myType{}) + select { + case <-time.After(5 * time.Second): + t.Error("timeout: deadlocked") + case <-done: + } + m.Shutdown() +}