Merge pull request #1321 from abursavich/master

pkg/watch: fix potential deadlock
This commit is contained in:
Daniel Smith 2014-09-15 18:26:19 -07:00
commit 9c25792dca
2 changed files with 42 additions and 8 deletions

View File

@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface {
id := m.nextWatcher id := m.nextWatcher
m.nextWatcher++ m.nextWatcher++
w := &muxWatcher{ w := &muxWatcher{
result: make(chan Event), result: make(chan Event),
id: id, stopped: make(chan struct{}),
m: m, id: id,
m: m,
} }
m.watchers[id] = w m.watchers[id] = w
return w return w
@ -114,20 +115,28 @@ func (m *Mux) loop() {
m.closeAll() m.closeAll()
} }
var testHookMuxDistribute = func() {}
// distribute sends event to all watchers. Blocking. // distribute sends event to all watchers. Blocking.
func (m *Mux) distribute(event Event) { func (m *Mux) distribute(event Event) {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
testHookMuxDistribute()
for _, w := range m.watchers { for _, w := range m.watchers {
w.result <- event select {
case w.result <- event:
case <-w.stopped:
}
} }
} }
// muxWatcher handles a single watcher of a mux // muxWatcher handles a single watcher of a mux
type muxWatcher struct { type muxWatcher struct {
result chan Event result chan Event
id int64 stopped chan struct{}
m *Mux stop sync.Once
id int64
m *Mux
} }
// ResultChan returns a channel to use for waiting on events. // ResultChan returns a channel to use for waiting on events.
@ -137,5 +146,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event {
// Stop stops watching and removes mw from its list. // Stop stops watching and removes mw from its list.
func (mw *muxWatcher) Stop() { func (mw *muxWatcher) Stop() {
mw.m.stopWatching(mw.id) mw.stop.Do(func() {
close(mw.stopped)
mw.m.stopWatching(mw.id)
})
} }

View File

@ -20,6 +20,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"time"
) )
type myType struct { type myType struct {
@ -91,3 +92,24 @@ func TestMuxWatcherClose(t *testing.T) {
w.Stop() w.Stop()
w2.Stop() w2.Stop()
} }
func TestMuxWatcherStopDeadlock(t *testing.T) {
defer func(fn func()) { testHookMuxDistribute = fn }(testHookMuxDistribute)
sig, done := make(chan bool), make(chan bool)
testHookMuxDistribute = func() { sig <- true }
m := NewMux(0)
go func(w Interface) {
// Imagine this goroutine was receiving from w.ResultChan()
// until it received some signal and stopped watching.
<-sig
w.Stop()
close(done)
}(m.Watch())
m.Action(Added, &myType{})
select {
case <-time.After(5 * time.Second):
t.Error("timeout: deadlocked")
case <-done:
}
m.Shutdown()
}