generate new event cache for every event sink

This commit is contained in:
jiangyaoguo
2015-09-07 00:04:29 +08:00
parent 8a95a82c88
commit e4ef28f096
4 changed files with 381 additions and 91 deletions

View File

@@ -104,6 +104,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
var eventCache *historyCache = NewEventCache()
return eventBroadcaster.StartEventWatcher(
func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
@@ -111,7 +112,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
eventCopy := *event
event = &eventCopy
previousEvent := getEvent(event)
previousEvent := eventCache.getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
@@ -122,7 +123,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
tries := 0
for {
if recordEvent(sink, event, updateExistingEvent) {
if recordEvent(sink, event, updateExistingEvent, eventCache) {
break
}
tries++
@@ -156,7 +157,7 @@ func isKeyNotFoundError(err error) bool {
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) bool {
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool, eventCache *historyCache) bool {
var newEvent *api.Event
var err error
if updateExistingEvent {
@@ -169,7 +170,7 @@ func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) boo
newEvent, err = sink.Create(event)
}
if err == nil {
addOrUpdateEvent(newEvent)
eventCache.addOrUpdateEvent(newEvent)
return true
}