From d8ee8e427e4daf8406ee94e919c94014600bb477 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sun, 31 Jan 2021 17:19:09 +0100 Subject: [PATCH] fix sync problem in streamwatcher The streamwatcher has a synchronization problem that may lead to a go routine blocking forever when closing a stream watch. This occasionally happens, when informers are cancelled together with the watch request using the stop channel, which leads to an increaing number of blocked go routines, if imformers are dynamicaly created and deleted again. The function `receive` checks under a lock whether the watch has been stopped, before an error is reported to the result channel. The problem here is, that in between the watcher might be stopped by calling the `Stop` method. In the actual code this is done by the `cache.Reflector` using the streamwatcher by a defer which is executed after the caller already stopped reading from the result channel. As a result the stopping flag might be set after the check and trying to send the error event blocks this send operation forever, because there will never be a receiver again. The fix introduces a dedicated local stop channel that is closed by the `Stop` method and used in a select statement together with the send operation to finally abort the loop. --- .../apimachinery/pkg/watch/streamwatcher.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 99f6770b919..4960dbf1cea 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -55,6 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event + done chan struct{} stopped bool } @@ -67,6 +68,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -84,6 +90,7 @@ func (sw *StreamWatcher) Stop() { defer sw.Unlock() if !sw.stopped { sw.stopped = true + close(sw.done) sw.source.Close() } } @@ -116,17 +123,25 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } }