diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 99f6770b919..4960dbf1cea 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -55,6 +55,7 @@ type StreamWatcher struct { source Decoder reporter Reporter result chan Event + done chan struct{} stopped bool } @@ -67,6 +68,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher { // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), } go sw.receive() return sw @@ -84,6 +90,7 @@ func (sw *StreamWatcher) Stop() { defer sw.Unlock() if !sw.stopped { sw.stopped = true + close(sw.done) sw.source.Close() } } @@ -116,17 +123,25 @@ func (sw *StreamWatcher) receive() { if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: Error, Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), + }: } } } return } - sw.result <- Event{ + select { + case <-sw.done: + return + case sw.result <- Event{ Type: action, Object: obj, + }: } } }