diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 0c87111ee16..5f3bd5b7b66 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface { id := m.nextWatcher m.nextWatcher++ w := &muxWatcher{ - result: make(chan Event), - id: id, - m: m, + result: make(chan Event), + stopped: make(chan struct{}), + id: id, + m: m, } m.watchers[id] = w return w @@ -114,20 +115,28 @@ func (m *Mux) loop() { m.closeAll() } +var testHookMuxDistribute = func() {} + // distribute sends event to all watchers. Blocking. func (m *Mux) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() + testHookMuxDistribute() for _, w := range m.watchers { - w.result <- event + select { + case w.result <- event: + case <-w.stopped: + } } } // muxWatcher handles a single watcher of a mux type muxWatcher struct { - result chan Event - id int64 - m *Mux + result chan Event + stopped chan struct{} + stop sync.Once + id int64 + m *Mux } // ResultChan returns a channel to use for waiting on events. @@ -137,5 +146,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event { // Stop stops watching and removes mw from its list. func (mw *muxWatcher) Stop() { - mw.m.stopWatching(mw.id) + mw.stop.Do(func() { + close(mw.stopped) + mw.m.stopWatching(mw.id) + }) } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index a9433a4bea8..679261fc4b2 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sync" "testing" + "time" ) type myType struct { @@ -91,3 +92,24 @@ func TestMuxWatcherClose(t *testing.T) { w.Stop() w2.Stop() } + +func TestMuxWatcherStopDeadlock(t *testing.T) { + defer func(fn func()) { testHookMuxDistribute = fn }(testHookMuxDistribute) + sig, done := make(chan bool), make(chan bool) + testHookMuxDistribute = func() { sig <- true } + m := NewMux(0) + go func(w Interface) { + // Imagine this goroutine was receiving from w.ResultChan() + // until it received some signal and stopped watching. + <-sig + w.Stop() + close(done) + }(m.Watch()) + m.Action(Added, &myType{}) + select { + case <-time.After(5 * time.Second): + t.Error("timeout: deadlocked") + case <-done: + } + m.Shutdown() +}