Merge pull request #46034 from kensimon/canonical-aggregate-events

Automatic merge from submit-queue (batch tested with PRs 46726, 41912, 46695, 46034, 46551)

Event aggregation: include latest event message in aggregate event

**What this PR does / why we need it**:

This changes the event aggregation behavior so that, when multiple events are deduplicated, the aggregated event includes the message of the latest related event.

This fixes an issue where the original event expires due to TTL, and the aggregate event doesn't contain any useful message.

**Which issue this PR fixes**:

fixes #45971

```release-note
Duplicate recurring Events now include the latest event's Message string
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-02 21:42:41 -07:00 committed by GitHub
commit 047c4667fe
2 changed files with 38 additions and 21 deletions

View File

@ -93,7 +93,7 @@ type EventAggregatorMessageFunc func(event *v1.Event) string
// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
return "(events with common reason combined)"
return "(combined from similar events): " + event.Message
}
// EventAggregator identifies similar events and aggregates them into a single event
@ -141,11 +141,22 @@ type aggregateRecord struct {
lastTimestamp metav1.Time
}
// EventAggregate identifies similar events and groups into a common event if required
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) {
aggregateKey, localKey := e.keyFunc(newEvent)
// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
// the full key for normal events, and to the result of
// EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
record := aggregateRecord{localKeys: sets.NewString(), lastTimestamp: now}
var record aggregateRecord
// eventKey is the full cache key for this event
eventKey := getEventKey(newEvent)
// aggregateKey is for the aggregate event, if one is needed.
aggregateKey, localKey := e.keyFunc(newEvent)
// Do we have a record of similar events in our cache?
e.Lock()
defer e.Unlock()
value, found := e.cache.Get(aggregateKey)
@ -153,24 +164,30 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error)
record = value.(aggregateRecord)
}
// if the last event was far enough in the past, it is not aggregated, and we must reset state
// Is the previous record too old? If so, make a fresh one. Note: if we didn't
// find a similar record, its lastTimestamp will be the zero value, so we
// create a new one in that case.
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}
// Write the new event into the aggregation record and put it on the cache
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)
// If we are not yet over the threshold for unique events, don't correlate them
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, nil
return newEvent, eventKey
}
// do not grow our local key set any larger than max
record.localKeys.PopAny()
// create a new aggregate event
// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
@ -185,7 +202,7 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error)
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, nil
return eventCopy, aggregateKey
}
// eventLog records data about when an event was observed
@ -215,22 +232,22 @@ func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
}
// eventObserve records the event, and determines if its frequency should update
func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error) {
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
var (
patch []byte
err error
)
key := getEventKey(newEvent)
eventCopy := *newEvent
event := &eventCopy
e.Lock()
defer e.Unlock()
// Check if there is an existing event we should update
lastObservation := e.lastEventObservationFromCache(key)
// we have seen this event before, so we must prepare a patch
// If we found a result, prepare a patch
if lastObservation.count > 0 {
// update the event based on the last observation so patch will work as desired
event.Name = lastObservation.name
@ -241,6 +258,7 @@ func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
eventCopy2.Message = ""
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
@ -337,6 +355,7 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
defaultAggregateMaxEvents,
defaultAggregateIntervalInSeconds,
clock),
logger: newEventLogger(cacheSize, clock),
}
}
@ -346,11 +365,8 @@ func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateRes
if c.filterFunc(newEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
aggregateEvent, err := c.aggregator.EventAggregate(newEvent)
if err != nil {
return &EventCorrelateResult{}, err
}
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent)
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

View File

@ -157,10 +157,11 @@ func TestEventAggregatorByReasonFunc(t *testing.T) {
// TestEventAggregatorByReasonMessageFunc validates the proper output for an aggregate message
func TestEventAggregatorByReasonMessageFunc(t *testing.T) {
expected := "(events with common reason combined)"
expectedPrefix := "(combined from similar events): "
event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
if actual := EventAggregatorByReasonMessageFunc(&event1); expected != actual {
t.Errorf("Expected %v got %v", expected, actual)
actual := EventAggregatorByReasonMessageFunc(&event1)
if !strings.HasPrefix(actual, expectedPrefix) {
t.Errorf("Expected %v to begin with prefix %v", actual, expectedPrefix)
}
}