diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 91b93cdc48d..191a991cc7d 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -18,6 +18,7 @@ package record import ( "fmt" + "math" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -25,13 +26,22 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -// retryEventSleep is the time between record failures to retry. Available for test alteration. -var retryEventSleep = 1 * time.Second +const ( + maxQueuedEvents = 1000 + maxTriesPerEvent = 10 +) + +var ( + minSleep = float64(1 * time.Second) + maxSleep = float64(15 * time.Second) + backoffExp = 1.5 +) // EventRecorder knows how to store events (client.Client implements it.) // EventRecorder must respect the namespace that will be embedded in 'event'. @@ -46,49 +56,82 @@ type EventRecorder interface { // or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { + // Set up our own personal buffer of events so that we can clear out GetEvents' + // broadcast channel as quickly as possible to avoid causing the relatively more + // important event-producing goroutines from blocking while trying to insert events. + eventQueue := make(chan *api.Event, maxQueuedEvents) + + // Run a function in the background that grabs events off the queue and tries + // to record them, retrying as appropriate to try to avoid dropping any. + go func() { + defer util.HandleCrash() + for event := range eventQueue { + tries := 0 + for { + if recordEvent(recorder, event) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + sleepDuration := time.Duration( + math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1)))) + time.Sleep(wait.Jitter(sleepDuration, 0.5)) + } + } + }() + + // Finally, kick off the watcher that takes events from the channel and puts them + // onto the queue. return GetEvents(func(event *api.Event) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy event.Source = source - try := 0 - for { - try++ - _, err := recorder.Create(event) - if err == nil { - break - } - // If we can't contact the server, then hold everything while we keep trying. - // Otherwise, something about the event is malformed and we should abandon it. - giveUp := false - switch err.(type) { - case *client.RequestConstructionError: - // We will construct the request the same next time, so don't keep trying. - giveUp = true - case *errors.StatusError: - // This indicates that the server understood and rejected our request. - giveUp = true - case *errors.UnexpectedObjectError: - // We don't expect this; it implies the server's response didn't match a - // known pattern. Go ahead and retry. - default: - // This case includes actual http transport errors. Go ahead and retry. - } - if giveUp { - glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err) - break - } - if try >= 3 { - glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err) - break - } - glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err) - time.Sleep(retryEventSleep) + // Drop new events rather than old ones because the old ones may contain + // some information explaining why everything is so backed up. + if len(eventQueue) == maxQueuedEvents { + glog.Errorf("Unable to write event '%#v' (event buffer full!)", event) + } else { + eventQueue <- event } }) } +// recordEvent attempts to write event to recorder. It returns true if the event +// was successfully recorded or discarded, false if it should be retried. +func recordEvent(recorder EventRecorder, event *api.Event) bool { + _, err := recorder.Create(event) + if err == nil { + return true + } + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + giveUp := false + switch err.(type) { + case *client.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + giveUp = true + case *errors.StatusError: + // This indicates that the server understood and rejected our request. + giveUp = true + case *errors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + if giveUp { + glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err) + return true + } + glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false +} + // StartLogging just logs local events, using the given logging function. The // return value can be ignored or used to stop logging, if desired. func StartLogging(logf func(format string, args ...interface{})) watch.Interface { diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 0d64ebaa7a6..e749cd53ffc 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -19,6 +19,7 @@ package record import ( "fmt" "reflect" + "strconv" "strings" "testing" "time" @@ -31,7 +32,9 @@ import ( ) func init() { - retryEventSleep = 1 * time.Microsecond + // Don't bother sleeping between retries. + minSleep = 0 + maxSleep = 0 } type testEventRecorder struct { @@ -192,12 +195,12 @@ func TestWriteEventError(t *testing.T) { }, "retry1": { timesToSendError: 1000, - attemptsWanted: 3, + attemptsWanted: 10, err: &errors.UnexpectedObjectError{}, }, "retry2": { timesToSendError: 1000, - attemptsWanted: 3, + attemptsWanted: 10, err: fmt.Errorf("A weird error"), }, "succeedEventually": { @@ -242,3 +245,54 @@ func TestWriteEventError(t *testing.T) { } } } + +func TestLotsOfEvents(t *testing.T) { + recorderCalled := make(chan struct{}) + loggerCalled := make(chan struct{}) + + // Fail each event a few times to ensure there's some load on the tested code. + var counts [1000]int + testEvents := testEventRecorder{ + OnEvent: func(event *api.Event) (*api.Event, error) { + num, err := strconv.Atoi(event.Message) + if err != nil { + t.Error(err) + return event, nil + } + counts[num]++ + if counts[num] < 5 { + return nil, fmt.Errorf("fake error") + } + recorderCalled <- struct{}{} + return event, nil + }, + } + recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"}) + logger := StartLogging(func(formatter string, args ...interface{}) { + loggerCalled <- struct{}{} + }) + + ref := &api.ObjectReference{ + Kind: "Pod", + Name: "foo", + Namespace: "baz", + UID: "bar", + APIVersion: "v1beta1", + } + for i := 0; i < maxQueuedEvents; i++ { + go Event(ref, "Status", "Reason", strconv.Itoa(i)) + } + // Make sure no events were dropped by either of the listeners. + for i := 0; i < maxQueuedEvents; i++ { + <-recorderCalled + <-loggerCalled + } + // Make sure that every event was attempted 5 times + for i := 0; i < maxQueuedEvents; i++ { + if counts[i] < 5 { + t.Errorf("Only attempted to record event '%d' %d times.", i, counts[i]) + } + } + recorder.Stop() + logger.Stop() +}