diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go index 734ac1411f6..d51f9567e42 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/mux.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/mux.go @@ -46,7 +46,7 @@ type Broadcaster struct { distributing sync.WaitGroup // incomingBlock allows us to ensure we don't race and end up sending events - // to a closed channel following a brodcaster shutdown. + // to a closed channel following a broadcaster shutdown. incomingBlock sync.Mutex incoming chan Event stopped chan struct{} @@ -115,6 +115,8 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object { // won't ever see that event, and will always see any event after they are // added. func (m *Broadcaster) blockQueue(f func()) { + m.incomingBlock.Lock() + defer m.incomingBlock.Unlock() select { case <-m.stopped: return @@ -252,8 +254,6 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, // have received the data yet as it can remain sitting in the buffered // channel. It will block until the broadcaster stop request is actually executed func (m *Broadcaster) Shutdown() { - m.incomingBlock.Lock() - defer m.incomingBlock.Unlock() m.blockQueue(func() { close(m.stopped) close(m.incoming)