diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go index 734ac1411f6..d51f9567e42 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go @@ -46,7 +46,7 @@ type Broadcaster struct { distributing sync.WaitGroup // incomingBlock allows us to ensure we don't race and end up sending events - // to a closed channel following a brodcaster shutdown. + // to a closed channel following a broadcaster shutdown. incomingBlock sync.Mutex incoming chan Event stopped chan struct{} @@ -115,6 +115,8 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object { // won't ever see that event, and will always see any event after they are // added. func (m *Broadcaster) blockQueue(f func()) { + m.incomingBlock.Lock() + defer m.incomingBlock.Unlock() select { case <-m.stopped: return @@ -252,8 +254,6 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, // have received the data yet as it can remain sitting in the buffered // channel. It will block until the broadcaster stop request is actually executed func (m *Broadcaster) Shutdown() { - m.incomingBlock.Lock() - defer m.incomingBlock.Unlock() m.blockQueue(func() { close(m.stopped) close(m.incoming) diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go index dadec3e9bae..aa3f3b7bb6d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package watch_test +package watch import ( "reflect" @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - . "k8s.io/apimachinery/pkg/watch" ) type myType struct { @@ -246,3 +245,47 @@ func TestBroadcasterSendEventAfterShutdown(t *testing.T) { assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown") assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown") } + +// Test this since we see usage patterns where the broadcaster and watchers are +// stopped simultaneously leading to races. +func TestBroadcasterShutdownRace(t *testing.T) { + m := NewBroadcaster(1, WaitIfChannelFull) + stopCh := make(chan struct{}) + + // Add a bunch of watchers + const testWatchers = 2 + for i := 0; i < testWatchers; i++ { + i := i + + _, err := m.Watch() + if err != nil { + t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err) + } + // This is how we force the watchers to close down independently of the + // eventbroadcaster, see real usage pattern in startRecordingEvents() + go func() { + <-stopCh + t.Log("Stopping Watchers") + m.stopWatching(int64(i)) + }() + } + + event := Event{Type: Added, Object: &myType{"foo", "hello world"}} + err := m.Action(event.Type, event.Object) + if err != nil { + t.Fatalf("error sending event: %v", err) + } + + // Manually simulate m.Shutdown() but change it to force a race scenario + // 1. Close watcher stopchannel, so watchers are closed independently of the + // eventBroadcaster + // 2. Shutdown the m.incoming slightly Before m.stopped so that the watcher's + // call of Blockqueue can pass the m.stopped check. + m.blockQueue(func() { + close(stopCh) + close(m.incoming) + time.Sleep(1 * time.Millisecond) + close(m.stopped) + }) + m.distributing.Wait() +}