Add a unit test for watch.Broadcaster DropIfChannelFull and a couple small fixes

This commit is contained in:
Alex Robinson 2015-01-14 04:36:29 +00:00
parent 3eaf362f8e
commit 90e1d58fa6
3 changed files with 62 additions and 5 deletions

View File

@ -33,7 +33,7 @@ import (
const maxTriesPerEvent = 12
var sleepDuration = time.Duration(10 * time.Second)
var sleepDuration = 10 * time.Second
// EventRecorder knows how to store events (client.Client implements it.)
// EventRecorder must respect the namespace that will be embedded in 'event'.
@ -48,6 +48,9 @@ type EventRecorder interface {
// or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
@ -68,7 +71,7 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}

View File

@ -27,10 +27,15 @@ import (
type FullChannelBehavior int
const (
WaitIfChannelFull = iota
DropIfChannelFull = iota
WaitIfChannelFull FullChannelBehavior = iota
DropIfChannelFull
)
// Buffer the incoming queue a little bit even though it should rarely ever accumulate
// anything, just in case a few events are received in such a short window that
// Broadcaster can't move them onto the watchers' queues fast enough.
const incomingQueueLength = 25
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Broadcaster struct {
@ -58,7 +63,7 @@ type Broadcaster struct {
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event),
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}

View File

@ -116,3 +116,52 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
}
m.Shutdown()
}
func TestBroadcasterDropIfChannelFull(t *testing.T) {
m := NewBroadcaster(1, DropIfChannelFull)
event1 := Event{Added, &myType{"foo", "hello world 1"}}
event2 := Event{Added, &myType{"bar", "hello world 2"}}
// Add a couple watchers
const testWatchers = 2
watches := make([]Interface, testWatchers)
for i := 0; i < testWatchers; i++ {
watches[i] = m.Watch()
}
// Send a couple events before closing the broadcast channel.
t.Log("Sending event 1")
m.Action(event1.Type, event1.Object)
t.Log("Sending event 2")
m.Action(event2.Type, event2.Object)
m.Shutdown()
// Pull events from the queue.
wg := sync.WaitGroup{}
wg.Add(testWatchers)
for i := 0; i < testWatchers; i++ {
// Verify that each watcher only gets the first event because its watch
// queue of length one was full from the first one.
go func(watcher int, w Interface) {
defer wg.Done()
e1, ok := <-w.ResultChan()
if !ok {
t.Error("Watcher %v failed to retrieve first event.")
return
}
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
watcher, e.Type, e.Object, a.Type, a.Object)
} else {
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
}
e2, ok := <-w.ResultChan()
if ok {
t.Error("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
watcher, e2.Type, e2.Object)
}
}(i, watches[i])
}
wg.Wait()
}