From 2355ceb79a2019850d640d00be40b97cc133fd4d Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Sat, 6 Mar 2021 14:42:59 +0100 Subject: [PATCH] simplier fix + test for race condition --- .../apimachinery/pkg/watch/streamwatcher.go | 31 +++---------------- .../pkg/watch/streamwatcher_test.go | 14 +++++++++ 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 48a5f0f5770..42dcac2b9e6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -56,7 +56,6 @@ type StreamWatcher struct { reporter Reporter result chan Event done chan struct{} - stopped bool } // NewStreamWatcher creates a StreamWatcher from the given decoder. @@ -88,20 +87,15 @@ func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() - if !sw.stopped { - sw.stopped = true + // closing a closed channel always panics, therefore check before closing + select { + case <-sw.done: + default: close(sw.done) sw.source.Close() } } -// stopping returns true if Stop() was called previously. -func (sw *StreamWatcher) stopping() bool { - sw.Lock() - defer sw.Unlock() - return sw.stopped -} - // receive reads result from the decoder in a loop and sends down the result channel. func (sw *StreamWatcher) receive() { defer utilruntime.HandleCrash() @@ -110,10 +104,6 @@ func (sw *StreamWatcher) receive() { for { action, obj, err := sw.source.Decode() if err != nil { - // Ignore expected error. - if sw.stopping() { - return - } switch err { case io.EOF: // watch closed normally @@ -124,20 +114,7 @@ func (sw *StreamWatcher) receive() { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { select { - // TODO: figure out how to test that sending the error after an externally stopped watcher - // cannot block anymore because of the introduced done channel. - // The function receive checks under a lock whether the watch has been stopped, - // before an error from the watch stream is reported to the result channel. - // The problem here is, that in between the watcher might be stopped by - // calling the Stop method. In the actual code this is done by the - // cache.Reflector using the streamwatcher by a defer (method watchHandler) - // which is executed after the caller already stopped reading from the result channel. - // As a result the stopping flag might be set after the check - // and trying to send the error event blocks this send operation forever, - // because there will never be a receiver again. - // This results in dead go routines trying to send on the result channel, forever. case <-sw.done: - return case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go index 685a0f13a90..0b459c89586 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher_test.go @@ -21,6 +21,7 @@ import ( "io" "reflect" "testing" + "time" "k8s.io/apimachinery/pkg/runtime" . "k8s.io/apimachinery/pkg/watch" @@ -105,3 +106,16 @@ func TestStreamWatcherError(t *testing.T) { t.Fatalf("unexpected open channel") } } + +func TestStreamWatcherRace(t *testing.T) { + fd := fakeDecoder{err: fmt.Errorf("test error")} + fr := &fakeReporter{} + sw := NewStreamWatcher(fd, fr) + time.Sleep(10 * time.Millisecond) + sw.Stop() + time.Sleep(10 * time.Millisecond) + _, ok := <-sw.ResultChan() + if ok { + t.Fatalf("unexpected pending send") + } +}