mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
commit
efb62b3538
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package watch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -44,8 +45,11 @@ type Broadcaster struct {
|
||||
nextWatcher int64
|
||||
distributing sync.WaitGroup
|
||||
|
||||
incoming chan Event
|
||||
stopped chan struct{}
|
||||
// incomingBlock allows us to ensure we don't race and end up sending events
|
||||
// to a closed channel following a brodcaster shutdown.
|
||||
incomingBlock sync.Mutex
|
||||
incoming chan Event
|
||||
stopped chan struct{}
|
||||
|
||||
// How large to make watcher's channel.
|
||||
watchQueueLength int
|
||||
@ -132,7 +136,7 @@ func (m *Broadcaster) blockQueue(f func()) {
|
||||
// Note: new watchers will only receive new events. They won't get an entire history
|
||||
// of previous events. It will block until the watcher is actually added to the
|
||||
// broadcaster.
|
||||
func (m *Broadcaster) Watch() Interface {
|
||||
func (m *Broadcaster) Watch() (Interface, error) {
|
||||
var w *broadcasterWatcher
|
||||
m.blockQueue(func() {
|
||||
id := m.nextWatcher
|
||||
@ -146,11 +150,9 @@ func (m *Broadcaster) Watch() Interface {
|
||||
m.watchers[id] = w
|
||||
})
|
||||
if w == nil {
|
||||
// The panic here is to be consistent with the previous interface behavior
|
||||
// we are willing to re-evaluate in the future.
|
||||
panic("broadcaster already stopped")
|
||||
return nil, fmt.Errorf("broadcaster already stopped")
|
||||
}
|
||||
return w
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
|
||||
@ -158,7 +160,7 @@ func (m *Broadcaster) Watch() Interface {
|
||||
// The returned watch will have a queue length that is at least large enough to accommodate
|
||||
// all of the items in queuedEvents. It will block until the watcher is actually added to
|
||||
// the broadcaster.
|
||||
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
|
||||
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) (Interface, error) {
|
||||
var w *broadcasterWatcher
|
||||
m.blockQueue(func() {
|
||||
id := m.nextWatcher
|
||||
@ -179,11 +181,9 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
|
||||
}
|
||||
})
|
||||
if w == nil {
|
||||
// The panic here is to be consistent with the previous interface behavior
|
||||
// we are willing to re-evaluate in the future.
|
||||
panic("broadcaster already stopped")
|
||||
return nil, fmt.Errorf("broadcaster already stopped")
|
||||
}
|
||||
return w
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// stopWatching stops the given watcher and removes it from the list.
|
||||
@ -210,19 +210,38 @@ func (m *Broadcaster) closeAll() {
|
||||
}
|
||||
|
||||
// Action distributes the given event among all watchers.
|
||||
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
|
||||
func (m *Broadcaster) Action(action EventType, obj runtime.Object) error {
|
||||
m.incomingBlock.Lock()
|
||||
defer m.incomingBlock.Unlock()
|
||||
select {
|
||||
case <-m.stopped:
|
||||
return fmt.Errorf("broadcaster already stopped")
|
||||
default:
|
||||
}
|
||||
|
||||
m.incoming <- Event{action, obj}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Action distributes the given event among all watchers, or drops it on the floor
|
||||
// if too many incoming actions are queued up. Returns true if the action was sent,
|
||||
// false if dropped.
|
||||
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
|
||||
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {
|
||||
m.incomingBlock.Lock()
|
||||
defer m.incomingBlock.Unlock()
|
||||
|
||||
// Ensure that if the broadcaster is stopped we do not send events to it.
|
||||
select {
|
||||
case <-m.stopped:
|
||||
return false, fmt.Errorf("broadcaster already stopped")
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case m.incoming <- Event{action, obj}:
|
||||
return true
|
||||
return true, nil
|
||||
default:
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -233,6 +252,8 @@ func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
|
||||
// have received the data yet as it can remain sitting in the buffered
|
||||
// channel. It will block until the broadcaster stop request is actually executed
|
||||
func (m *Broadcaster) Shutdown() {
|
||||
m.incomingBlock.Lock()
|
||||
defer m.incomingBlock.Unlock()
|
||||
m.blockQueue(func() {
|
||||
close(m.stopped)
|
||||
close(m.incoming)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -58,6 +59,10 @@ func TestBroadcaster(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(testWatchers)
|
||||
for i := 0; i < testWatchers; i++ {
|
||||
w, err := m.Watch()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
// Verify that each watcher gets the events in the correct order
|
||||
go func(watcher int, w Interface) {
|
||||
tableLine := 0
|
||||
@ -75,7 +80,7 @@ func TestBroadcaster(t *testing.T) {
|
||||
tableLine++
|
||||
}
|
||||
wg.Done()
|
||||
}(i, m.Watch())
|
||||
}(i, w)
|
||||
}
|
||||
|
||||
for i, item := range table {
|
||||
@ -90,8 +95,14 @@ func TestBroadcaster(t *testing.T) {
|
||||
|
||||
func TestBroadcasterWatcherClose(t *testing.T) {
|
||||
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||
w := m.Watch()
|
||||
w2 := m.Watch()
|
||||
w, err := m.Watch()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
w2, err := m.Watch()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
w.Stop()
|
||||
m.Shutdown()
|
||||
if _, open := <-w.ResultChan(); open {
|
||||
@ -108,6 +119,14 @@ func TestBroadcasterWatcherClose(t *testing.T) {
|
||||
func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
|
||||
done := make(chan bool)
|
||||
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||
w, err := m.Watch()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
w2, err := m.Watch()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
go func(w0, w1 Interface) {
|
||||
// We know Broadcaster is in the distribute loop once one watcher receives
|
||||
// an event. Stop the other watcher while distribute is trying to
|
||||
@ -119,7 +138,7 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
|
||||
w0.Stop()
|
||||
}
|
||||
close(done)
|
||||
}(m.Watch(), m.Watch())
|
||||
}(w, w2)
|
||||
m.Action(Added, &myType{})
|
||||
select {
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
@ -137,8 +156,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
||||
|
||||
// Add a couple watchers
|
||||
watches := make([]Interface, 2)
|
||||
var err error
|
||||
for i := range watches {
|
||||
watches[i] = m.Watch()
|
||||
watches[i], err = m.Watch()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send a couple events before closing the broadcast channel.
|
||||
@ -194,33 +217,32 @@ func TestBroadcasterWatchAfterShutdown(t *testing.T) {
|
||||
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||
m.Shutdown()
|
||||
|
||||
watch := func() {
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Error("should cause panic")
|
||||
}
|
||||
}()
|
||||
m.Watch()
|
||||
}
|
||||
watch()
|
||||
_, err := m.Watch()
|
||||
assert.EqualError(t, err, "broadcaster already stopped", "Watch should report error id broadcaster is shutdown")
|
||||
|
||||
watchWithPrefix := func() {
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Error("should cause panic")
|
||||
}
|
||||
}()
|
||||
m.WatchWithPrefix([]Event{event1, event2})
|
||||
}
|
||||
watchWithPrefix()
|
||||
|
||||
action := func() {
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Error("should cause panic")
|
||||
}
|
||||
}()
|
||||
m.Action(event1.Type, event1.Object)
|
||||
}
|
||||
action()
|
||||
_, err = m.WatchWithPrefix([]Event{event1, event2})
|
||||
assert.EqualError(t, err, "broadcaster already stopped", "WatchWithPrefix should report error id broadcaster is shutdown")
|
||||
}
|
||||
|
||||
func TestBroadcasterSendEventAfterShutdown(t *testing.T) {
|
||||
m := NewBroadcaster(1, DropIfChannelFull)
|
||||
|
||||
event := Event{Type: Added, Object: &myType{"foo", "hello world"}}
|
||||
|
||||
// Add a couple watchers
|
||||
watches := make([]Interface, 2)
|
||||
for i := range watches {
|
||||
watches[i], _ = m.Watch()
|
||||
}
|
||||
m.Shutdown()
|
||||
|
||||
// Send a couple events after closing the broadcast channel.
|
||||
t.Log("Sending event")
|
||||
|
||||
err := m.Action(event.Type, event.Object)
|
||||
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
|
||||
|
||||
sendOnClosed, err := m.ActionOrDrop(event.Type, event.Object)
|
||||
assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown")
|
||||
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
|
||||
}
|
||||
|
@ -275,11 +275,11 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
|
||||
// clients).
|
||||
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
|
||||
}
|
||||
return f.Broadcaster.WatchWithPrefix(changes), nil
|
||||
return f.Broadcaster.WatchWithPrefix(changes)
|
||||
} else if rc > f.lastRV {
|
||||
return nil, errors.New("resource version in the future not supported by this fake")
|
||||
}
|
||||
return f.Broadcaster.Watch(), nil
|
||||
return f.Broadcaster.Watch()
|
||||
}
|
||||
|
||||
// Shutdown closes the underlying broadcaster, waiting for events to be
|
||||
|
@ -307,7 +307,15 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func
|
||||
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||
// The return value is used to stop recording
|
||||
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
|
||||
watcher := e.Watch()
|
||||
watcher, err := e.Watch()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
// TODO: Rewrite the function signature to return an error, for
|
||||
// now just return a no-op function
|
||||
return func() {
|
||||
klog.Error("The event watcher failed to start")
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
for {
|
||||
|
@ -298,7 +298,10 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc
|
||||
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||
// The return value can be ignored or used to stop recording, if desired.
|
||||
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
|
||||
watcher := e.Watch()
|
||||
watcher, err := e.Watch()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
|
||||
}
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
for watchEvent := range watcher.ResultChan() {
|
||||
@ -346,7 +349,12 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
|
||||
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
|
||||
// and log an error if that happens (we've configured the broadcaster to drop
|
||||
// outgoing events anyway).
|
||||
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
|
||||
sent, err := recorder.ActionOrDrop(watch.Added, event)
|
||||
if err != nil {
|
||||
klog.Errorf("unable to record event: %v (will not retry!)", err)
|
||||
return
|
||||
}
|
||||
if !sent {
|
||||
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user