mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #91602 from sxllwx/lock_free_broadcaster
Lock-free broadcaster
This commit is contained in:
commit
16bd0bcfd9
@ -40,15 +40,12 @@ const incomingQueueLength = 25
|
|||||||
// Broadcaster distributes event notifications among any number of watchers. Every event
|
// Broadcaster distributes event notifications among any number of watchers. Every event
|
||||||
// is delivered to every watcher.
|
// is delivered to every watcher.
|
||||||
type Broadcaster struct {
|
type Broadcaster struct {
|
||||||
// TODO: see if this lock is needed now that new watchers go through
|
|
||||||
// the incoming channel.
|
|
||||||
lock sync.Mutex
|
|
||||||
|
|
||||||
watchers map[int64]*broadcasterWatcher
|
watchers map[int64]*broadcasterWatcher
|
||||||
nextWatcher int64
|
nextWatcher int64
|
||||||
distributing sync.WaitGroup
|
distributing sync.WaitGroup
|
||||||
|
|
||||||
incoming chan Event
|
incoming chan Event
|
||||||
|
stopped chan struct{}
|
||||||
|
|
||||||
// How large to make watcher's channel.
|
// How large to make watcher's channel.
|
||||||
watchQueueLength int
|
watchQueueLength int
|
||||||
@ -68,6 +65,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
|
|||||||
m := &Broadcaster{
|
m := &Broadcaster{
|
||||||
watchers: map[int64]*broadcasterWatcher{},
|
watchers: map[int64]*broadcasterWatcher{},
|
||||||
incoming: make(chan Event, incomingQueueLength),
|
incoming: make(chan Event, incomingQueueLength),
|
||||||
|
stopped: make(chan struct{}),
|
||||||
watchQueueLength: queueLength,
|
watchQueueLength: queueLength,
|
||||||
fullChannelBehavior: fullChannelBehavior,
|
fullChannelBehavior: fullChannelBehavior,
|
||||||
}
|
}
|
||||||
@ -96,10 +94,15 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
|
|||||||
// The purpose of this terrible hack is so that watchers added after an event
|
// The purpose of this terrible hack is so that watchers added after an event
|
||||||
// won't ever see that event, and will always see any event after they are
|
// won't ever see that event, and will always see any event after they are
|
||||||
// added.
|
// added.
|
||||||
func (b *Broadcaster) blockQueue(f func()) {
|
func (m *Broadcaster) blockQueue(f func()) {
|
||||||
|
select {
|
||||||
|
case <-m.stopped:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
b.incoming <- Event{
|
m.incoming <- Event{
|
||||||
Type: internalRunFunctionMarker,
|
Type: internalRunFunctionMarker,
|
||||||
Object: functionFakeRuntimeObject(func() {
|
Object: functionFakeRuntimeObject(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
@ -111,12 +114,11 @@ func (b *Broadcaster) blockQueue(f func()) {
|
|||||||
|
|
||||||
// Watch adds a new watcher to the list and returns an Interface for it.
|
// Watch adds a new watcher to the list and returns an Interface for it.
|
||||||
// Note: new watchers will only receive new events. They won't get an entire history
|
// Note: new watchers will only receive new events. They won't get an entire history
|
||||||
// of previous events.
|
// of previous events. It will block until the watcher is actually added to the
|
||||||
|
// broadcaster.
|
||||||
func (m *Broadcaster) Watch() Interface {
|
func (m *Broadcaster) Watch() Interface {
|
||||||
var w *broadcasterWatcher
|
var w *broadcasterWatcher
|
||||||
m.blockQueue(func() {
|
m.blockQueue(func() {
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
id := m.nextWatcher
|
id := m.nextWatcher
|
||||||
m.nextWatcher++
|
m.nextWatcher++
|
||||||
w = &broadcasterWatcher{
|
w = &broadcasterWatcher{
|
||||||
@ -127,18 +129,22 @@ func (m *Broadcaster) Watch() Interface {
|
|||||||
}
|
}
|
||||||
m.watchers[id] = w
|
m.watchers[id] = w
|
||||||
})
|
})
|
||||||
|
if w == nil {
|
||||||
|
// The panic here is to be consistent with the previous interface behavior
|
||||||
|
// we are willing to re-evaluate in the future.
|
||||||
|
panic("broadcaster already stopped")
|
||||||
|
}
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
|
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
|
||||||
// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
|
// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
|
||||||
// The returned watch will have a queue length that is at least large enough to accommodate
|
// The returned watch will have a queue length that is at least large enough to accommodate
|
||||||
// all of the items in queuedEvents.
|
// all of the items in queuedEvents. It will block until the watcher is actually added to
|
||||||
|
// the broadcaster.
|
||||||
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
|
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
|
||||||
var w *broadcasterWatcher
|
var w *broadcasterWatcher
|
||||||
m.blockQueue(func() {
|
m.blockQueue(func() {
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
id := m.nextWatcher
|
id := m.nextWatcher
|
||||||
m.nextWatcher++
|
m.nextWatcher++
|
||||||
length := m.watchQueueLength
|
length := m.watchQueueLength
|
||||||
@ -156,26 +162,29 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
|
|||||||
w.result <- e
|
w.result <- e
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
if w == nil {
|
||||||
|
// The panic here is to be consistent with the previous interface behavior
|
||||||
|
// we are willing to re-evaluate in the future.
|
||||||
|
panic("broadcaster already stopped")
|
||||||
|
}
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
// stopWatching stops the given watcher and removes it from the list.
|
// stopWatching stops the given watcher and removes it from the list.
|
||||||
func (m *Broadcaster) stopWatching(id int64) {
|
func (m *Broadcaster) stopWatching(id int64) {
|
||||||
m.lock.Lock()
|
m.blockQueue(func() {
|
||||||
defer m.lock.Unlock()
|
w, ok := m.watchers[id]
|
||||||
w, ok := m.watchers[id]
|
if !ok {
|
||||||
if !ok {
|
// No need to do anything, it's already been removed from the list.
|
||||||
// No need to do anything, it's already been removed from the list.
|
return
|
||||||
return
|
}
|
||||||
}
|
delete(m.watchers, id)
|
||||||
delete(m.watchers, id)
|
close(w.result)
|
||||||
close(w.result)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeAll disconnects all watchers (presumably in response to a Shutdown call).
|
// closeAll disconnects all watchers (presumably in response to a Shutdown call).
|
||||||
func (m *Broadcaster) closeAll() {
|
func (m *Broadcaster) closeAll() {
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
for _, w := range m.watchers {
|
for _, w := range m.watchers {
|
||||||
close(w.result)
|
close(w.result)
|
||||||
}
|
}
|
||||||
@ -194,9 +203,12 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
|
|||||||
// until all events have been distributed through the outbound channels. Note
|
// until all events have been distributed through the outbound channels. Note
|
||||||
// that since they can be buffered, this means that the watchers might not
|
// that since they can be buffered, this means that the watchers might not
|
||||||
// have received the data yet as it can remain sitting in the buffered
|
// have received the data yet as it can remain sitting in the buffered
|
||||||
// channel.
|
// channel. It will block until the broadcaster stop request is actually executed
|
||||||
func (m *Broadcaster) Shutdown() {
|
func (m *Broadcaster) Shutdown() {
|
||||||
close(m.incoming)
|
m.blockQueue(func() {
|
||||||
|
close(m.stopped)
|
||||||
|
close(m.incoming)
|
||||||
|
})
|
||||||
m.distributing.Wait()
|
m.distributing.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -217,8 +229,6 @@ func (m *Broadcaster) loop() {
|
|||||||
|
|
||||||
// distribute sends event to all watchers. Blocking.
|
// distribute sends event to all watchers. Blocking.
|
||||||
func (m *Broadcaster) distribute(event Event) {
|
func (m *Broadcaster) distribute(event Event) {
|
||||||
m.lock.Lock()
|
|
||||||
defer m.lock.Unlock()
|
|
||||||
if m.fullChannelBehavior == DropIfChannelFull {
|
if m.fullChannelBehavior == DropIfChannelFull {
|
||||||
for _, w := range m.watchers {
|
for _, w := range m.watchers {
|
||||||
select {
|
select {
|
||||||
@ -252,6 +262,7 @@ func (mw *broadcasterWatcher) ResultChan() <-chan Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops watching and removes mw from its list.
|
// Stop stops watching and removes mw from its list.
|
||||||
|
// It will block until the watcher stop request is actually executed
|
||||||
func (mw *broadcasterWatcher) Stop() {
|
func (mw *broadcasterWatcher) Stop() {
|
||||||
mw.stop.Do(func() {
|
mw.stop.Do(func() {
|
||||||
close(mw.stopped)
|
close(mw.stopped)
|
||||||
|
@ -174,3 +174,53 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
|||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkBroadCaster(b *testing.B) {
|
||||||
|
event1 := Event{Type: Added, Object: &myType{"foo", "hello world 1"}}
|
||||||
|
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||||
|
b.ResetTimer()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
m.Action(event1.Type, event1.Object)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBroadcasterWatchAfterShutdown(t *testing.T) {
|
||||||
|
event1 := Event{Type: Added, Object: &myType{"foo", "hello world 1"}}
|
||||||
|
event2 := Event{Type: Added, Object: &myType{"bar", "hello world 2"}}
|
||||||
|
|
||||||
|
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||||
|
m.Shutdown()
|
||||||
|
|
||||||
|
watch := func() {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err == nil {
|
||||||
|
t.Error("should cause panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
m.Watch()
|
||||||
|
}
|
||||||
|
watch()
|
||||||
|
|
||||||
|
watchWithPrefix := func() {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err == nil {
|
||||||
|
t.Error("should cause panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
m.WatchWithPrefix([]Event{event1, event2})
|
||||||
|
}
|
||||||
|
watchWithPrefix()
|
||||||
|
|
||||||
|
action := func() {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err == nil {
|
||||||
|
t.Error("should cause panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
m.Action(event1.Type, event1.Object)
|
||||||
|
}
|
||||||
|
action()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user