From 702a6f96b45318254728e034c84ba5ce4effe6a7 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Sat, 10 Jan 2015 00:58:07 +0000 Subject: [PATCH 1/4] Improve client recording of events such that clients are (1) less likely to drop events if the master is unavailable (2) less likely to have goroutines block while trying to record an event. Done as part of #3163 to ensure that minions operate well even while the master is down. --- pkg/client/record/event.go | 113 ++++++++++++++++++++++---------- pkg/client/record/event_test.go | 60 ++++++++++++++++- 2 files changed, 135 insertions(+), 38 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 91b93cdc48d..191a991cc7d 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -18,6 +18,7 @@ package record import ( "fmt" + "math" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -25,13 +26,22 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -// retryEventSleep is the time between record failures to retry. Available for test alteration. -var retryEventSleep = 1 * time.Second +const ( + maxQueuedEvents = 1000 + maxTriesPerEvent = 10 +) + +var ( + minSleep = float64(1 * time.Second) + maxSleep = float64(15 * time.Second) + backoffExp = 1.5 +) // EventRecorder knows how to store events (client.Client implements it.) // EventRecorder must respect the namespace that will be embedded in 'event'. @@ -46,49 +56,82 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { + // Set up our own personal buffer of events so that we can clear out GetEvents' + // broadcast channel as quickly as possible to avoid causing the relatively more + // important event-producing goroutines from blocking while trying to insert events. + eventQueue := make(chan *api.Event, maxQueuedEvents) + + // Run a function in the background that grabs events off the queue and tries + // to record them, retrying as appropriate to try to avoid dropping any. + go func() { + defer util.HandleCrash() + for event := range eventQueue { + tries := 0 + for { + if recordEvent(recorder, event) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + sleepDuration := time.Duration( + math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) + time.Sleep(wait.Jitter(sleepDuration, 0.5)) + } + } + }() + + // Finally, kick off the watcher that takes events from the channel and puts them + // onto the queue. return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy event.Source = source - try := 0 - for { - try++ - _, err := recorder.Create(event) - if err == nil { - break - } - // If we can't contact the server, then hold everything while we keep trying. - // Otherwise, something about the event is malformed and we should abandon it. - giveUp := false - switch err.(type) { - case *client.RequestConstructionError: - // We will construct the request the same next time, so don't keep trying. - giveUp = true - case *errors.StatusError: - // This indicates that the server understood and rejected our request. - giveUp = true - case *errors.UnexpectedObjectError: - // We don't expect this; it implies the server's response didn't match a - // known pattern. Go ahead and retry. - default: - // This case includes actual http transport errors. Go ahead and retry. - } - if giveUp { - glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err) - break - } - if try >= 3 { - glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err) - break - } - glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err) - time.Sleep(retryEventSleep) + // Drop new events rather than old ones because the old ones may contain + // some information explaining why everything is so backed up. + if len(eventQueue) == maxQueuedEvents { + glog.Errorf("Unable to write event '%#v' (event buffer full!)", event) + } else { + eventQueue <- event } }) } +// recordEvent attempts to write event to recorder. It returns true if the event +// was successfully recorded or discarded, false if it should be retried. +func recordEvent(recorder EventRecorder, event *api.Event) bool { + _, err := recorder.Create(event) + if err == nil { + return true + } + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + giveUp := false + switch err.(type) { + case *client.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + giveUp = true + case *errors.StatusError: + // This indicates that the server understood and rejected our request. + giveUp = true + case *errors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + if giveUp { + glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err) + return true + } + glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false +} + // StartLogging just logs local events, using the given logging function. The // return value can be ignored or used to stop logging, if desired. func StartLogging(logf func(format string, args ...interface{})) watch.Interface { diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 0d64ebaa7a6..e749cd53ffc 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -19,6 +19,7 @@ package record import ( "fmt" "reflect" + "strconv" "strings" "testing" "time" @@ -31,7 +32,9 @@ import ( ) func init() { - retryEventSleep = 1 * time.Microsecond + // Don't bother sleeping between retries. + minSleep = 0 + maxSleep = 0 } type testEventRecorder struct { @@ -192,12 +195,12 @@ func TestWriteEventError(t *testing.T) { }, "retry1": { timesToSendError: 1000, - attemptsWanted: 3, + attemptsWanted: 10, err: &errors.UnexpectedObjectError{}, }, "retry2": { timesToSendError: 1000, - attemptsWanted: 3, + attemptsWanted: 10, err: fmt.Errorf("A weird error"), }, "succeedEventually": { @@ -242,3 +245,54 @@ func TestWriteEventError(t *testing.T) { } } } + +func TestLotsOfEvents(t *testing.T) { + recorderCalled := make(chan struct{}) + loggerCalled := make(chan struct{}) + + // Fail each event a few times to ensure there's some load on the tested code. + var counts [1000]int + testEvents := testEventRecorder{ + OnEvent: func(event *api.Event) (*api.Event, error) { + num, err := strconv.Atoi(event.Message) + if err != nil { + t.Error(err) + return event, nil + } + counts[num]++ + if counts[num] < 5 { + return nil, fmt.Errorf("fake error") + } + recorderCalled <- struct{}{} + return event, nil + }, + } + recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"}) + logger := StartLogging(func(formatter string, args ...interface{}) { + loggerCalled <- struct{}{} + }) + + ref := &api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "v1beta1", + } + for i := 0; i < maxQueuedEvents; i++ { + go Event(ref, "Status", "Reason", strconv.Itoa(i)) + } + // Make sure no events were dropped by either of the listeners. + for i := 0; i < maxQueuedEvents; i++ { + <-recorderCalled + <-loggerCalled + } + // Make sure that every event was attempted 5 times + for i := 0; i < maxQueuedEvents; i++ { + if counts[i] < 5 { + t.Errorf("Only attempted to record event '%d' %d times.", i, counts[i]) + } + } + recorder.Stop() + logger.Stop() +} From be6b1cf0e299866e435258788f0c32b99cffcb61 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Tue, 13 Jan 2015 01:40:17 +0000 Subject: [PATCH 2/4] Push the full channel logic into the implementation of the broadcaster in watch/mux.go rather than being in the client event recording code. --- pkg/client/record/event.go | 58 ++++++++-------------------- pkg/registry/registrytest/generic.go | 2 +- pkg/registry/registrytest/pod.go | 2 +- pkg/watch/mux.go | 55 +++++++++++++++++++------- pkg/watch/mux_test.go | 6 +-- 5 files changed, 64 insertions(+), 59 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 191a991cc7d..77291b036f0 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -32,10 +32,7 @@ import ( "github.com/golang/glog" ) -const ( - maxQueuedEvents = 1000 - maxTriesPerEvent = 10 -) +const maxTriesPerEvent = 10 var ( minSleep = float64(1 * time.Second) @@ -56,47 +53,26 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { - // Set up our own personal buffer of events so that we can clear out GetEvents' - // broadcast channel as quickly as possible to avoid causing the relatively more - // important event-producing goroutines from blocking while trying to insert events. - eventQueue := make(chan *api.Event, maxQueuedEvents) - - // Run a function in the background that grabs events off the queue and tries - // to record them, retrying as appropriate to try to avoid dropping any. - go func() { - defer util.HandleCrash() - for event := range eventQueue { - tries := 0 - for { - if recordEvent(recorder, event) { - break - } - tries++ - if tries >= maxTriesPerEvent { - glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) - break - } - sleepDuration := time.Duration( - math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) - time.Sleep(wait.Jitter(sleepDuration, 0.5)) - } - } - }() - - // Finally, kick off the watcher that takes events from the channel and puts them - // onto the queue. return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy event.Source = source - // Drop new events rather than old ones because the old ones may contain - // some information explaining why everything is so backed up. - if len(eventQueue) == maxQueuedEvents { - glog.Errorf("Unable to write event '%#v' (event buffer full!)", event) - } else { - eventQueue <- event + + tries := 0 + for { + if recordEvent(recorder, event) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + sleepDuration := time.Duration( + math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) + time.Sleep(wait.Jitter(sleepDuration, 0.5)) } }) } @@ -163,9 +139,9 @@ func GetEvents(f func(*api.Event)) watch.Interface { return w } -const queueLen = 1000 +const maxQueuedEvents = 1000 -var events = watch.NewBroadcaster(queueLen) +var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 28a0d66b5eb..46776a786ec 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -39,7 +39,7 @@ type GenericRegistry struct { func NewGeneric(list runtime.Object) *GenericRegistry { return &GenericRegistry{ ObjectList: list, - Broadcaster: watch.NewBroadcaster(0), + Broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), } } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 6d60d30f62b..571b61074b2 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -36,7 +36,7 @@ type PodRegistry struct { func NewPodRegistry(pods *api.PodList) *PodRegistry { return &PodRegistry{ Pods: pods, - broadcaster: watch.NewBroadcaster(0), + broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), } } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index e4d8bef1605..3a10f08ca63 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -22,6 +22,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) +// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch +// channel is full. +type FullChannelBehavior int + +const ( + WaitIfChannelFull = iota + DropIfChannelFull = iota +) + // Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. type Broadcaster struct { @@ -31,17 +40,27 @@ type Broadcaster struct { nextWatcher int64 incoming chan Event + + // How large to make watcher's channel. + watchQueueLength int + // If one of the watch channels is full, don't wait for it to become empty. + // Instead just deliver it to the watchers that do have space in their + // channels and move on to the next event. + // It's more fair to do this on a per-watcher basis than to do it on the + // "incoming" channel, which would allow one slow watcher to prevent all + // other watchers from getting new events. + fullChannelBehavior FullChannelBehavior } -// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue. -// When queueLength is 0, Action will block until any prior event has been -// completely distributed. It is guaranteed that events will be distibuted in the -// order in which they ocurr, but the order in which a single event is distributed -// among all of the watchers is unspecified. -func NewBroadcaster(queueLength int) *Broadcaster { +// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. +// It is guaranteed that events will be distibuted in the order in which they ocur, +// but the order in which a single event is distributed among all of the watchers is unspecified. +func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ - watchers: map[int64]*broadcasterWatcher{}, - incoming: make(chan Event, queueLength), + watchers: map[int64]*broadcasterWatcher{}, + incoming: make(chan Event), + watchQueueLength: queueLength, + fullChannelBehavior: fullChannelBehavior, } go m.loop() return m @@ -56,7 +75,7 @@ func (m *Broadcaster) Watch() Interface { id := m.nextWatcher m.nextWatcher++ w := &broadcasterWatcher{ - result: make(chan Event), + result: make(chan Event, m.watchQueueLength), stopped: make(chan struct{}), id: id, m: m, @@ -119,10 +138,20 @@ func (m *Broadcaster) loop() { func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() - for _, w := range m.watchers { - select { - case w.result <- event: - case <-w.stopped: + if m.fullChannelBehavior == DropIfChannelFull { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + default: // Don't block if the event can't be queued. + } + } + } else { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + } } } } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 89584618717..2cc548c62fc 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -39,7 +39,7 @@ func TestBroadcaster(t *testing.T) { } // The broadcaster we're testing - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) // Add a bunch of watchers const testWatchers = 2 @@ -77,7 +77,7 @@ func TestBroadcaster(t *testing.T) { } func TestBroadcasterWatcherClose(t *testing.T) { - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) w := m.Watch() w2 := m.Watch() w.Stop() @@ -95,7 +95,7 @@ func TestBroadcasterWatcherClose(t *testing.T) { func TestBroadcasterWatcherStopDeadlock(t *testing.T) { done := make(chan bool) - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) go func(w0, w1 Interface) { // We know Broadcaster is in the distribute loop once one watcher receives // an event. Stop the other watcher while distribute is trying to From 3eaf362f8e7c2fc171998947f9c57328fd23015b Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Tue, 13 Jan 2015 19:13:24 +0000 Subject: [PATCH 3/4] Switch the client event recorder from exponential backoff to one random sleep on the first failed request followed by a constant amount on all subsequent consecutive failed requests. --- pkg/client/record/event.go | 21 ++++++++++----------- pkg/client/record/event_test.go | 8 +++----- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 77291b036f0..df73972b9b5 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -18,7 +18,7 @@ package record import ( "fmt" - "math" + "math/rand" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -26,19 +26,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -const maxTriesPerEvent = 10 +const maxTriesPerEvent = 12 -var ( - minSleep = float64(1 * time.Second) - maxSleep = float64(15 * time.Second) - backoffExp = 1.5 -) +var sleepDuration = time.Duration(10 * time.Second) // EventRecorder knows how to store events (client.Client implements it.) // EventRecorder must respect the namespace that will be embedded in 'event'. @@ -70,9 +65,13 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } - sleepDuration := time.Duration( - math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) - time.Sleep(wait.Jitter(sleepDuration, 0.5)) + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) + } else { + time.Sleep(sleepDuration) + } } }) } diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index e749cd53ffc..24a688838eb 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -33,8 +32,7 @@ import ( func init() { // Don't bother sleeping between retries. - minSleep = 0 - maxSleep = 0 + sleepDuration = 0 } type testEventRecorder struct { @@ -195,12 +193,12 @@ func TestWriteEventError(t *testing.T) { }, "retry1": { timesToSendError: 1000, - attemptsWanted: 10, + attemptsWanted: 12, err: &errors.UnexpectedObjectError{}, }, "retry2": { timesToSendError: 1000, - attemptsWanted: 10, + attemptsWanted: 12, err: fmt.Errorf("A weird error"), }, "succeedEventually": { From 90e1d58fa6f3af1e1500b33084878f318b0372f9 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 14 Jan 2015 04:36:29 +0000 Subject: [PATCH 4/4] Add a unit test for watch.Broadcaster DropIfChannelFull and a couple small fixes --- pkg/client/record/event.go | 7 ++++-- pkg/watch/mux.go | 11 ++++++--- pkg/watch/mux_test.go | 49 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index df73972b9b5..dc9bda1b145 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -33,7 +33,7 @@ import ( const maxTriesPerEvent = 12 -var sleepDuration = time.Duration(10 * time.Second) +var sleepDuration = 10 * time.Second // EventRecorder knows how to store events (client.Client implements it.) // EventRecorder must respect the namespace that will be embedded in 'event'. @@ -48,6 +48,9 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { + // The default math/rand package functions aren't thread safe, so create a + // new Rand object for each StartRecording call. + randGen := rand.New(rand.NewSource(time.Now().UnixNano())) return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. @@ -68,7 +71,7 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf // Randomize the first sleep so that various clients won't all be // synced up if the master goes down. if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) } else { time.Sleep(sleepDuration) } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 3a10f08ca63..ed5c09e5129 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -27,10 +27,15 @@ import ( type FullChannelBehavior int const ( - WaitIfChannelFull = iota - DropIfChannelFull = iota + WaitIfChannelFull FullChannelBehavior = iota + DropIfChannelFull ) +// Buffer the incoming queue a little bit even though it should rarely ever accumulate +// anything, just in case a few events are received in such a short window that +// Broadcaster can't move them onto the watchers' queues fast enough. +const incomingQueueLength = 25 + // Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. type Broadcaster struct { @@ -58,7 +63,7 @@ type Broadcaster struct { func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ watchers: map[int64]*broadcasterWatcher{}, - incoming: make(chan Event), + incoming: make(chan Event, incomingQueueLength), watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 2cc548c62fc..a662605efbe 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -116,3 +116,52 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) { } m.Shutdown() } + +func TestBroadcasterDropIfChannelFull(t *testing.T) { + m := NewBroadcaster(1, DropIfChannelFull) + + event1 := Event{Added, &myType{"foo", "hello world 1"}} + event2 := Event{Added, &myType{"bar", "hello world 2"}} + + // Add a couple watchers + const testWatchers = 2 + watches := make([]Interface, testWatchers) + for i := 0; i < testWatchers; i++ { + watches[i] = m.Watch() + } + + // Send a couple events before closing the broadcast channel. + t.Log("Sending event 1") + m.Action(event1.Type, event1.Object) + t.Log("Sending event 2") + m.Action(event2.Type, event2.Object) + m.Shutdown() + + // Pull events from the queue. + wg := sync.WaitGroup{} + wg.Add(testWatchers) + for i := 0; i < testWatchers; i++ { + // Verify that each watcher only gets the first event because its watch + // queue of length one was full from the first one. + go func(watcher int, w Interface) { + defer wg.Done() + e1, ok := <-w.ResultChan() + if !ok { + t.Error("Watcher %v failed to retrieve first event.") + return + } + if e, a := event1, e1; !reflect.DeepEqual(e, a) { + t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", + watcher, e.Type, e.Object, a.Type, a.Object) + } else { + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) + } + e2, ok := <-w.ResultChan() + if ok { + t.Error("Watcher %v received second event (%v, %#v) even though it shouldn't have.", + watcher, e2.Type, e2.Object) + } + }(i, watches[i]) + } + wg.Wait() +}