diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 21747c7d621..d85306b8b71 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -18,6 +18,7 @@ package record import ( "fmt" + "math/rand" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -30,8 +31,9 @@ import ( "github.com/golang/glog" ) -// retryEventSleep is the time between record failures to retry. Available for test alteration. -var retryEventSleep = 1 * time.Second +const maxTriesPerEvent = 12 + +var sleepDuration = 10 * time.Second // EventRecorder knows how to store events (client.Client implements it.) // EventRecorder must respect the namespace that will be embedded in 'event'. @@ -46,49 +48,68 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { + // The default math/rand package functions aren't thread safe, so create a + // new Rand object for each StartRecording call. + randGen := rand.New(rand.NewSource(time.Now().UnixNano())) return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy event.Source = source - try := 0 + + tries := 0 for { - try++ - _, err := recorder.Create(event) - if err == nil { + if recordEvent(recorder, event) { break } - // If we can't contact the server, then hold everything while we keep trying. - // Otherwise, something about the event is malformed and we should abandon it. - giveUp := false - switch err.(type) { - case *client.RequestConstructionError: - // We will construct the request the same next time, so don't keep trying. - giveUp = true - case *errors.StatusError: - // This indicates that the server understood and rejected our request. - giveUp = true - case *errors.UnexpectedObjectError: - // We don't expect this; it implies the server's response didn't match a - // known pattern. Go ahead and retry. - default: - // This case includes actual http transport errors. Go ahead and retry. - } - if giveUp { - glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err) + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } - if try >= 3 { - glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err) - break + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) + } else { + time.Sleep(sleepDuration) } - glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err) - time.Sleep(retryEventSleep) } }) } +// recordEvent attempts to write event to recorder. It returns true if the event +// was successfully recorded or discarded, false if it should be retried. +func recordEvent(recorder EventRecorder, event *api.Event) bool { + _, err := recorder.Create(event) + if err == nil { + return true + } + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + giveUp := false + switch err.(type) { + case *client.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + giveUp = true + case *errors.StatusError: + // This indicates that the server understood and rejected our request. + giveUp = true + case *errors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + if giveUp { + glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err) + return true + } + glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false +} + // StartLogging just logs local events, using the given logging function. The // return value can be ignored or used to stop logging, if desired. func StartLogging(logf func(format string, args ...interface{})) watch.Interface { @@ -120,9 +141,9 @@ func GetEvents(f func(*api.Event)) watch.Interface { return w } -const queueLen = 1000 +const maxQueuedEvents = 1000 -var events = watch.NewBroadcaster(queueLen) +var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index b372cffe374..6cf1f74454c 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -19,9 +19,9 @@ package record import ( "fmt" "reflect" + "strconv" "strings" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -31,7 +31,8 @@ import ( ) func init() { - retryEventSleep = 1 * time.Microsecond + // Don't bother sleeping between retries. + sleepDuration = 0 } type testEventRecorder struct { @@ -188,12 +189,12 @@ func TestWriteEventError(t *testing.T) { }, "retry1": { timesToSendError: 1000, - attemptsWanted: 3, + attemptsWanted: 12, err: &errors.UnexpectedObjectError{}, }, "retry2": { timesToSendError: 1000, - attemptsWanted: 3, + attemptsWanted: 12, err: fmt.Errorf("A weird error"), }, "succeedEventually": { @@ -238,3 +239,54 @@ func TestWriteEventError(t *testing.T) { } } } + +func TestLotsOfEvents(t *testing.T) { + recorderCalled := make(chan struct{}) + loggerCalled := make(chan struct{}) + + // Fail each event a few times to ensure there's some load on the tested code. + var counts [1000]int + testEvents := testEventRecorder{ + OnEvent: func(event *api.Event) (*api.Event, error) { + num, err := strconv.Atoi(event.Message) + if err != nil { + t.Error(err) + return event, nil + } + counts[num]++ + if counts[num] < 5 { + return nil, fmt.Errorf("fake error") + } + recorderCalled <- struct{}{} + return event, nil + }, + } + recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"}) + logger := StartLogging(func(formatter string, args ...interface{}) { + loggerCalled <- struct{}{} + }) + + ref := &api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "v1beta1", + } + for i := 0; i < maxQueuedEvents; i++ { + go Event(ref, "Status", "Reason", strconv.Itoa(i)) + } + // Make sure no events were dropped by either of the listeners. + for i := 0; i < maxQueuedEvents; i++ { + <-recorderCalled + <-loggerCalled + } + // Make sure that every event was attempted 5 times + for i := 0; i < maxQueuedEvents; i++ { + if counts[i] < 5 { + t.Errorf("Only attempted to record event '%d' %d times.", i, counts[i]) + } + } + recorder.Stop() + logger.Stop() +} diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 28a0d66b5eb..46776a786ec 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -39,7 +39,7 @@ type GenericRegistry struct { func NewGeneric(list runtime.Object) *GenericRegistry { return &GenericRegistry{ ObjectList: list, - Broadcaster: watch.NewBroadcaster(0), + Broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), } } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 6d60d30f62b..571b61074b2 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -36,7 +36,7 @@ type PodRegistry struct { func NewPodRegistry(pods *api.PodList) *PodRegistry { return &PodRegistry{ Pods: pods, - broadcaster: watch.NewBroadcaster(0), + broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), } } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index e4d8bef1605..ed5c09e5129 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -22,6 +22,20 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) +// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch +// channel is full. +type FullChannelBehavior int + +const ( + WaitIfChannelFull FullChannelBehavior = iota + DropIfChannelFull +) + +// Buffer the incoming queue a little bit even though it should rarely ever accumulate +// anything, just in case a few events are received in such a short window that +// Broadcaster can't move them onto the watchers' queues fast enough. +const incomingQueueLength = 25 + // Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. type Broadcaster struct { @@ -31,17 +45,27 @@ type Broadcaster struct { nextWatcher int64 incoming chan Event + + // How large to make watcher's channel. + watchQueueLength int + // If one of the watch channels is full, don't wait for it to become empty. + // Instead just deliver it to the watchers that do have space in their + // channels and move on to the next event. + // It's more fair to do this on a per-watcher basis than to do it on the + // "incoming" channel, which would allow one slow watcher to prevent all + // other watchers from getting new events. + fullChannelBehavior FullChannelBehavior } -// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue. -// When queueLength is 0, Action will block until any prior event has been -// completely distributed. It is guaranteed that events will be distibuted in the -// order in which they ocurr, but the order in which a single event is distributed -// among all of the watchers is unspecified. -func NewBroadcaster(queueLength int) *Broadcaster { +// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. +// It is guaranteed that events will be distibuted in the order in which they ocur, +// but the order in which a single event is distributed among all of the watchers is unspecified. +func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ - watchers: map[int64]*broadcasterWatcher{}, - incoming: make(chan Event, queueLength), + watchers: map[int64]*broadcasterWatcher{}, + incoming: make(chan Event, incomingQueueLength), + watchQueueLength: queueLength, + fullChannelBehavior: fullChannelBehavior, } go m.loop() return m @@ -56,7 +80,7 @@ func (m *Broadcaster) Watch() Interface { id := m.nextWatcher m.nextWatcher++ w := &broadcasterWatcher{ - result: make(chan Event), + result: make(chan Event, m.watchQueueLength), stopped: make(chan struct{}), id: id, m: m, @@ -119,10 +143,20 @@ func (m *Broadcaster) loop() { func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() - for _, w := range m.watchers { - select { - case w.result <- event: - case <-w.stopped: + if m.fullChannelBehavior == DropIfChannelFull { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + default: // Don't block if the event can't be queued. + } + } + } else { + for _, w := range m.watchers { + select { + case w.result <- event: + case <-w.stopped: + } } } } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 89584618717..a662605efbe 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -39,7 +39,7 @@ func TestBroadcaster(t *testing.T) { } // The broadcaster we're testing - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) // Add a bunch of watchers const testWatchers = 2 @@ -77,7 +77,7 @@ func TestBroadcaster(t *testing.T) { } func TestBroadcasterWatcherClose(t *testing.T) { - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) w := m.Watch() w2 := m.Watch() w.Stop() @@ -95,7 +95,7 @@ func TestBroadcasterWatcherClose(t *testing.T) { func TestBroadcasterWatcherStopDeadlock(t *testing.T) { done := make(chan bool) - m := NewBroadcaster(0) + m := NewBroadcaster(0, WaitIfChannelFull) go func(w0, w1 Interface) { // We know Broadcaster is in the distribute loop once one watcher receives // an event. Stop the other watcher while distribute is trying to @@ -116,3 +116,52 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) { } m.Shutdown() } + +func TestBroadcasterDropIfChannelFull(t *testing.T) { + m := NewBroadcaster(1, DropIfChannelFull) + + event1 := Event{Added, &myType{"foo", "hello world 1"}} + event2 := Event{Added, &myType{"bar", "hello world 2"}} + + // Add a couple watchers + const testWatchers = 2 + watches := make([]Interface, testWatchers) + for i := 0; i < testWatchers; i++ { + watches[i] = m.Watch() + } + + // Send a couple events before closing the broadcast channel. + t.Log("Sending event 1") + m.Action(event1.Type, event1.Object) + t.Log("Sending event 2") + m.Action(event2.Type, event2.Object) + m.Shutdown() + + // Pull events from the queue. + wg := sync.WaitGroup{} + wg.Add(testWatchers) + for i := 0; i < testWatchers; i++ { + // Verify that each watcher only gets the first event because its watch + // queue of length one was full from the first one. + go func(watcher int, w Interface) { + defer wg.Done() + e1, ok := <-w.ResultChan() + if !ok { + t.Error("Watcher %v failed to retrieve first event.") + return + } + if e, a := event1, e1; !reflect.DeepEqual(e, a) { + t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", + watcher, e.Type, e.Object, a.Type, a.Object) + } else { + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) + } + e2, ok := <-w.ResultChan() + if ok { + t.Error("Watcher %v received second event (%v, %#v) even though it shouldn't have.", + watcher, e2.Type, e2.Object) + } + }(i, watches[i]) + } + wg.Wait() +}