diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go index dadec3e9bae..aa3f3b7bb6d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package watch_test +package watch import ( "reflect" @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - . "k8s.io/apimachinery/pkg/watch" ) type myType struct { @@ -246,3 +245,47 @@ func TestBroadcasterSendEventAfterShutdown(t *testing.T) { assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown") assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown") } + +// Test this since we see usage patterns where the broadcaster and watchers are +// stopped simultaneously leading to races. +func TestBroadcasterShutdownRace(t *testing.T) { + m := NewBroadcaster(1, WaitIfChannelFull) + stopCh := make(chan struct{}) + + // Add a bunch of watchers + const testWatchers = 2 + for i := 0; i < testWatchers; i++ { + i := i + + _, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } + // This is how we force the watchers to close down independently of the + // eventbroadcaster, see real usage pattern in startRecordingEvents() + go func() { + <-stopCh + t.Log("Stopping Watchers") + m.stopWatching(int64(i)) + }() + } + + event := Event{Type: Added, Object: &myType{"foo", "hello world"}} + err := m.Action(event.Type, event.Object) + if err != nil { + t.Fatalf("error sending event: %v", err) + } + + // Manually simulate m.Shutdown() but change it to force a race scenario + // 1. Close watcher stopchannel, so watchers are closed independently of the + // eventBroadcaster + // 2. Shutdown the m.incoming slightly Before m.stopped so that the watcher's + // call of Blockqueue can pass the m.stopped check. + m.blockQueue(func() { + close(stopCh) + close(m.incoming) + time.Sleep(1 * time.Millisecond) + close(m.stopped) + }) + m.distributing.Wait() +}