Merge pull request #98653 from mandelsoft/stream

fix race condition problem in streamwatcher
This commit is contained in:
Kubernetes Prow Robot 2021-03-08 11:04:25 -08:00 committed by GitHub
commit ab7d68a58a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 16 deletions

View File

@ -55,7 +55,7 @@ type StreamWatcher struct {
source Decoder source Decoder
reporter Reporter reporter Reporter
result chan Event result chan Event
stopped bool done chan struct{}
} }
// NewStreamWatcher creates a StreamWatcher from the given decoder. // NewStreamWatcher creates a StreamWatcher from the given decoder.
@ -67,6 +67,11 @@ func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
// goroutine/channel, but impossible for them to remove it, // goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better. // so nonbuffered is better.
result: make(chan Event), result: make(chan Event),
// If the watcher is externally stopped there is no receiver anymore
// and the send operations on the result channel, especially the
// error reporting might block forever.
// Therefore a dedicated stop channel is used to resolve this blocking.
done: make(chan struct{}),
} }
go sw.receive() go sw.receive()
return sw return sw
@ -82,19 +87,15 @@ func (sw *StreamWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag. // Call Close() exactly once by locking and setting a flag.
sw.Lock() sw.Lock()
defer sw.Unlock() defer sw.Unlock()
if !sw.stopped { // closing a closed channel always panics, therefore check before closing
sw.stopped = true select {
case <-sw.done:
default:
close(sw.done)
sw.source.Close() sw.source.Close()
} }
} }
// stopping returns true if Stop() was called previously.
func (sw *StreamWatcher) stopping() bool {
sw.Lock()
defer sw.Unlock()
return sw.stopped
}
// receive reads result from the decoder in a loop and sends down the result channel. // receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() { func (sw *StreamWatcher) receive() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -103,10 +104,6 @@ func (sw *StreamWatcher) receive() {
for { for {
action, obj, err := sw.source.Decode() action, obj, err := sw.source.Decode()
if err != nil { if err != nil {
// Ignore expected error.
if sw.stopping() {
return
}
switch err { switch err {
case io.EOF: case io.EOF:
// watch closed normally // watch closed normally
@ -116,17 +113,24 @@ func (sw *StreamWatcher) receive() {
if net.IsProbableEOF(err) || net.IsTimeout(err) { if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else { } else {
sw.result <- Event{ select {
case <-sw.done:
case sw.result <- Event{
Type: Error, Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)), Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}:
} }
} }
} }
return return
} }
sw.result <- Event{ select {
case <-sw.done:
return
case sw.result <- Event{
Type: action, Type: action,
Object: obj, Object: obj,
}:
} }
} }
} }

View File

@ -21,6 +21,7 @@ import (
"io" "io"
"reflect" "reflect"
"testing" "testing"
"time"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
. "k8s.io/apimachinery/pkg/watch" . "k8s.io/apimachinery/pkg/watch"
@ -105,3 +106,16 @@ func TestStreamWatcherError(t *testing.T) {
t.Fatalf("unexpected open channel") t.Fatalf("unexpected open channel")
} }
} }
func TestStreamWatcherRace(t *testing.T) {
fd := fakeDecoder{err: fmt.Errorf("test error")}
fr := &fakeReporter{}
sw := NewStreamWatcher(fd, fr)
time.Sleep(10 * time.Millisecond)
sw.Stop()
time.Sleep(10 * time.Millisecond)
_, ok := <-sw.ResultChan()
if ok {
t.Fatalf("unexpected pending send")
}
}