From d8ee8e427e4daf8406ee94e919c94014600bb477 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sun, 31 Jan 2021 17:19:09 +0100 Subject: [PATCH 1/3] fix sync problem in streamwatcher The streamwatcher has a synchronization problem that may lead to a go routine blocking forever when closing a stream watch. This occasionally happens, when informers are cancelled together with the watch request using the stop channel, which leads to an increaing number of blocked go routines, if imformers are dynamicaly created and deleted again. The function `receive` checks under a lock whether the watch has been stopped, before an error is reported to the result channel. The problem here is, that in between the watcher might be stopped by calling the `Stop` method. In the actual code this is done by the `cache.Reflector` using the streamwatcher by a defer which is executed after the caller already stopped reading from the result channel. As a result the stopping flag might be set after the check and trying to send the error event blocks this send operation forever, because there will never be a receiver again. The fix introduces a dedicated local stop channel that is closed by the `Stop` method and used in a select statement together with the send operation to finally abort the loop. --- .../apimachinery/pkg/watch/streamwatcher.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 99f6770b919..4960dbf1cea 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -55,6 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event + done chan struct{} stopped bool } @@ -67,6 +68,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -84,6 +90,7 @@ func (sw *StreamWatcher) Stop() { defer sw.Unlock() if !sw.stopped { sw.stopped = true + close(sw.done) sw.source.Close() } } @@ -116,17 +123,25 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } } From 932f98acafbfb4b0cb08d30738dbdc3fa2fcf6b2 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Fri, 5 Mar 2021 17:14:19 +0100 Subject: [PATCH 2/3] add comment describing the race condition + TODO for appropriate test --- .../k8s.io/apimachinery/pkg/watch/streamwatcher.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 4960dbf1cea..48a5f0f5770 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -124,6 +124,18 @@ func (sw *StreamWatcher) receive() { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { select { + // TODO: figure out how to test that sending the error after an externally stopped watcher + // cannot block anymore because of the introduced done channel. + // The function receive checks under a lock whether the watch has been stopped, + // before an error from the watch stream is reported to the result channel. + // The problem here is, that in between the watcher might be stopped by + // calling the Stop method. In the actual code this is done by the + // cache.Reflector using the streamwatcher by a defer (method watchHandler) + // which is executed after the caller already stopped reading from the result channel. + // As a result the stopping flag might be set after the check + // and trying to send the error event blocks this send operation forever, + // because there will never be a receiver again. + // This results in dead go routines trying to send on the result channel, forever. case <-sw.done: return case sw.result <- Event{ From 2355ceb79a2019850d640d00be40b97cc133fd4d Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sat, 6 Mar 2021 14:42:59 +0100 Subject: [PATCH 3/3] simplier fix + test for race condition --- .../apimachinery/pkg/watch/streamwatcher.go | 31 +++---------------- .../pkg/watch/streamwatcher_test.go | 14 +++++++++ 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 48a5f0f5770..42dcac2b9e6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -56,7 +56,6 @@ type StreamWatcher struct { reporter Reporter result chan Event done chan struct{} - stopped bool } // NewStreamWatcher creates a StreamWatcher from the given decoder. @@ -88,20 +87,15 @@ func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() - if !sw.stopped { - sw.stopped = true + // closing a closed channel always panics, therefore check before closing + select { + case <-sw.done: + default: close(sw.done) sw.source.Close() } } -// stopping returns true if Stop() was called previously. -func (sw *StreamWatcher) stopping() bool { - sw.Lock() - defer sw.Unlock() - return sw.stopped -} - // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer utilruntime.HandleCrash() @@ -110,10 +104,6 @@ func (sw *StreamWatcher) receive() { for { action, obj, err := sw.source.Decode() if err != nil { - // Ignore expected error. - if sw.stopping() { - return - } switch err { case io.EOF: // watch closed normally @@ -124,20 +114,7 @@ func (sw *StreamWatcher) receive() { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { select { - // TODO: figure out how to test that sending the error after an externally stopped watcher - // cannot block anymore because of the introduced done channel. - // The function receive checks under a lock whether the watch has been stopped, - // before an error from the watch stream is reported to the result channel. - // The problem here is, that in between the watcher might be stopped by - // calling the Stop method. In the actual code this is done by the - // cache.Reflector using the streamwatcher by a defer (method watchHandler) - // which is executed after the caller already stopped reading from the result channel. - // As a result the stopping flag might be set after the check - // and trying to send the error event blocks this send operation forever, - // because there will never be a receiver again. - // This results in dead go routines trying to send on the result channel, forever. case <-sw.done: - return case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go index 685a0f13a90..0b459c89586 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go @@ -21,6 +21,7 @@ import ( "io" "reflect" "testing" + "time" "k8s.io/apimachinery/pkg/runtime" . "k8s.io/apimachinery/pkg/watch" @@ -105,3 +106,16 @@ func TestStreamWatcherError(t *testing.T) { t.Fatalf("unexpected open channel") } } + +func TestStreamWatcherRace(t *testing.T) { + fd := fakeDecoder{err: fmt.Errorf("test error")} + fr := &fakeReporter{} + sw := NewStreamWatcher(fd, fr) + time.Sleep(10 * time.Millisecond) + sw.Stop() + time.Sleep(10 * time.Millisecond) + _, ok := <-sw.ResultChan() + if ok { + t.Fatalf("unexpected pending send") + } +}