mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Fix additional panic
Ensure we take the incomingBlock Lock in blockQueue to ensure there is not any possiblity of sending on a closed incoming channel. Signed-off-by: Andrew Stoycos <astoycos@redhat.com>
This commit is contained in:
parent
2d614a182c
commit
b7a37f5b3d
@ -46,7 +46,7 @@ type Broadcaster struct {
|
|||||||
distributing sync.WaitGroup
|
distributing sync.WaitGroup
|
||||||
|
|
||||||
// incomingBlock allows us to ensure we don't race and end up sending events
|
// incomingBlock allows us to ensure we don't race and end up sending events
|
||||||
// to a closed channel following a brodcaster shutdown.
|
// to a closed channel following a broadcaster shutdown.
|
||||||
incomingBlock sync.Mutex
|
incomingBlock sync.Mutex
|
||||||
incoming chan Event
|
incoming chan Event
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
@ -115,6 +115,8 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
|
|||||||
// won't ever see that event, and will always see any event after they are
|
// won't ever see that event, and will always see any event after they are
|
||||||
// added.
|
// added.
|
||||||
func (m *Broadcaster) blockQueue(f func()) {
|
func (m *Broadcaster) blockQueue(f func()) {
|
||||||
|
m.incomingBlock.Lock()
|
||||||
|
defer m.incomingBlock.Unlock()
|
||||||
select {
|
select {
|
||||||
case <-m.stopped:
|
case <-m.stopped:
|
||||||
return
|
return
|
||||||
@ -252,8 +254,6 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool,
|
|||||||
// have received the data yet as it can remain sitting in the buffered
|
// have received the data yet as it can remain sitting in the buffered
|
||||||
// channel. It will block until the broadcaster stop request is actually executed
|
// channel. It will block until the broadcaster stop request is actually executed
|
||||||
func (m *Broadcaster) Shutdown() {
|
func (m *Broadcaster) Shutdown() {
|
||||||
m.incomingBlock.Lock()
|
|
||||||
defer m.incomingBlock.Unlock()
|
|
||||||
m.blockQueue(func() {
|
m.blockQueue(func() {
|
||||||
close(m.stopped)
|
close(m.stopped)
|
||||||
close(m.incoming)
|
close(m.incoming)
|
||||||
|
Loading…
Reference in New Issue
Block a user