mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
pkg/watch: fix potential deadlock
This commit is contained in:
parent
24b5b7e8d3
commit
435e0b73bb
@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface {
|
||||
id := m.nextWatcher
|
||||
m.nextWatcher++
|
||||
w := &muxWatcher{
|
||||
result: make(chan Event),
|
||||
id: id,
|
||||
m: m,
|
||||
result: make(chan Event),
|
||||
stopped: make(chan struct{}),
|
||||
id: id,
|
||||
m: m,
|
||||
}
|
||||
m.watchers[id] = w
|
||||
return w
|
||||
@ -119,15 +120,20 @@ func (m *Mux) distribute(event Event) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
for _, w := range m.watchers {
|
||||
w.result <- event
|
||||
select {
|
||||
case w.result <- event:
|
||||
case <-w.stopped:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// muxWatcher handles a single watcher of a mux
|
||||
type muxWatcher struct {
|
||||
result chan Event
|
||||
id int64
|
||||
m *Mux
|
||||
result chan Event
|
||||
stopped chan struct{}
|
||||
stop sync.Once
|
||||
id int64
|
||||
m *Mux
|
||||
}
|
||||
|
||||
// ResultChan returns a channel to use for waiting on events.
|
||||
@ -137,5 +143,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event {
|
||||
|
||||
// Stop stops watching and removes mw from its list.
|
||||
func (mw *muxWatcher) Stop() {
|
||||
mw.m.stopWatching(mw.id)
|
||||
mw.stop.Do(func() {
|
||||
close(mw.stopped)
|
||||
mw.m.stopWatching(mw.id)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user