fix sync problem in streamwatcher

The streamwatcher has a synchronization problem that may lead to
a go routine blocking forever when closing a stream watch.

This occasionally happens, when informers are cancelled together with the
watch request using the stop channel, which leads to an increaing
number of blocked go routines, if imformers are dynamicaly created and deleted
again.

The function `receive` checks under a lock whether the watch has been stopped,
before an error is reported to the result channel.
The problem here is, that in between the watcher might be stopped by
calling the `Stop` method. In the actual code this is done by the
`cache.Reflector` using the streamwatcher by a defer which is executed after
the caller already stopped reading from the result channel.
As a result the stopping flag might be set after the check
and trying to send the error event blocks this send operation forever,
because there will never be a receiver again.

The fix introduces a dedicated local stop channel that is closed by the
`Stop` method and used in a select statement together with the send
operation to finally abort the loop.
This commit is contained in:
Uwe Krueger 2021-01-31 17:19:09 +01:00
parent 8306eeab75
commit d8ee8e427e

View File

@ -55,6 +55,7 @@ type StreamWatcher struct {
source Decoder
reporter Reporter
result chan Event
done chan struct{}
stopped bool
}
@ -67,6 +68,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
// If the watcher is externally stopped there is no receiver anymore
// and the send operations on the result channel, especially the
// error reporting might block forever.
// Therefore a dedicated stop channel is used to resolve this blocking.
done: make(chan struct{}),
}
go sw.receive()
return sw
@ -84,6 +90,7 @@ func (sw *StreamWatcher) Stop() {
defer sw.Unlock()
if !sw.stopped {
sw.stopped = true
close(sw.done)
sw.source.Close()
}
}
@ -116,17 +123,25 @@ func (sw *StreamWatcher) receive() {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
sw.result <- Event{
select {
case <-sw.done:
return
case sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}:
}
}
}
return
}
sw.result <- Event{
select {
case <-sw.done:
return
case sw.result <- Event{
Type: action,
Object: obj,
}:
}
}
}