mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
simplier fix + test for race condition
This commit is contained in:
parent
932f98acaf
commit
2355ceb79a
@ -56,7 +56,6 @@ type StreamWatcher struct {
|
||||
reporter Reporter
|
||||
result chan Event
|
||||
done chan struct{}
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// NewStreamWatcher creates a StreamWatcher from the given decoder.
|
||||
@ -88,20 +87,15 @@ func (sw *StreamWatcher) Stop() {
|
||||
// Call Close() exactly once by locking and setting a flag.
|
||||
sw.Lock()
|
||||
defer sw.Unlock()
|
||||
if !sw.stopped {
|
||||
sw.stopped = true
|
||||
// closing a closed channel always panics, therefore check before closing
|
||||
select {
|
||||
case <-sw.done:
|
||||
default:
|
||||
close(sw.done)
|
||||
sw.source.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// stopping returns true if Stop() was called previously.
|
||||
func (sw *StreamWatcher) stopping() bool {
|
||||
sw.Lock()
|
||||
defer sw.Unlock()
|
||||
return sw.stopped
|
||||
}
|
||||
|
||||
// receive reads result from the decoder in a loop and sends down the result channel.
|
||||
func (sw *StreamWatcher) receive() {
|
||||
defer utilruntime.HandleCrash()
|
||||
@ -110,10 +104,6 @@ func (sw *StreamWatcher) receive() {
|
||||
for {
|
||||
action, obj, err := sw.source.Decode()
|
||||
if err != nil {
|
||||
// Ignore expected error.
|
||||
if sw.stopping() {
|
||||
return
|
||||
}
|
||||
switch err {
|
||||
case io.EOF:
|
||||
// watch closed normally
|
||||
@ -124,20 +114,7 @@ func (sw *StreamWatcher) receive() {
|
||||
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
|
||||
} else {
|
||||
select {
|
||||
// TODO: figure out how to test that sending the error after an externally stopped watcher
|
||||
// cannot block anymore because of the introduced done channel.
|
||||
// The function receive checks under a lock whether the watch has been stopped,
|
||||
// before an error from the watch stream is reported to the result channel.
|
||||
// The problem here is, that in between the watcher might be stopped by
|
||||
// calling the Stop method. In the actual code this is done by the
|
||||
// cache.Reflector using the streamwatcher by a defer (method watchHandler)
|
||||
// which is executed after the caller already stopped reading from the result channel.
|
||||
// As a result the stopping flag might be set after the check
|
||||
// and trying to send the error event blocks this send operation forever,
|
||||
// because there will never be a receiver again.
|
||||
// This results in dead go routines trying to send on the result channel, forever.
|
||||
case <-sw.done:
|
||||
return
|
||||
case sw.result <- Event{
|
||||
Type: Error,
|
||||
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
. "k8s.io/apimachinery/pkg/watch"
|
||||
@ -105,3 +106,16 @@ func TestStreamWatcherError(t *testing.T) {
|
||||
t.Fatalf("unexpected open channel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamWatcherRace(t *testing.T) {
|
||||
fd := fakeDecoder{err: fmt.Errorf("test error")}
|
||||
fr := &fakeReporter{}
|
||||
sw := NewStreamWatcher(fd, fr)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
sw.Stop()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
_, ok := <-sw.ResultChan()
|
||||
if ok {
|
||||
t.Fatalf("unexpected pending send")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user