From 6ada269d8f199b02dd5852f1545746a14f58f4eb Mon Sep 17 00:00:00 2001 From: Ken Simon Date: Wed, 17 May 2017 10:34:40 -0700 Subject: [PATCH] Include event messages in aggregated events This changes the event aggregation behavior so that, when multiple events are deduplicated, the aggregated event includes the message of the latest related event. This fixes an issue where the original event expires due to TTL, and the aggregate event doesn't contain any useful message. --- .../client-go/tools/record/events_cache.go | 52 ++++++++++++------- .../tools/record/events_cache_test.go | 7 +-- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/record/events_cache.go b/staging/src/k8s.io/client-go/tools/record/events_cache.go index f5c0ca35ff1..bde110bf35f 100644 --- a/staging/src/k8s.io/client-go/tools/record/events_cache.go +++ b/staging/src/k8s.io/client-go/tools/record/events_cache.go @@ -93,7 +93,7 @@ type EventAggregatorMessageFunc func(event *v1.Event) string // EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message func EventAggregatorByReasonMessageFunc(event *v1.Event) string { - return "(events with common reason combined)" + return "(combined from similar events): " + event.Message } // EventAggregator identifies similar events and aggregates them into a single event @@ -141,11 +141,22 @@ type aggregateRecord struct { lastTimestamp metav1.Time } -// EventAggregate identifies similar events and groups into a common event if required -func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) { - aggregateKey, localKey := e.keyFunc(newEvent) +// EventAggregate checks if a similar event has been seen according to the +// aggregation configuration (max events, max interval, etc) and returns: +// +// - The (potentially modified) event that should be created +// - The cache key for the event, for correlation purposes. This will be set to +// the full key for normal events, and to the result of +// EventAggregatorMessageFunc for aggregate events. +func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) - record := aggregateRecord{localKeys: sets.NewString(), lastTimestamp: now} + var record aggregateRecord + // eventKey is the full cache key for this event + eventKey := getEventKey(newEvent) + // aggregateKey is for the aggregate event, if one is needed. + aggregateKey, localKey := e.keyFunc(newEvent) + + // Do we have a record of similar events in our cache? e.Lock() defer e.Unlock() value, found := e.cache.Get(aggregateKey) @@ -153,24 +164,30 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) record = value.(aggregateRecord) } - // if the last event was far enough in the past, it is not aggregated, and we must reset state + // Is the previous record too old? If so, make a fresh one. Note: if we didn't + // find a similar record, its lastTimestamp will be the zero value, so we + // create a new one in that case. maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval > maxInterval { record = aggregateRecord{localKeys: sets.NewString()} } + + // Write the new event into the aggregation record and put it on the cache record.localKeys.Insert(localKey) record.lastTimestamp = now e.cache.Add(aggregateKey, record) + // If we are not yet over the threshold for unique events, don't correlate them if uint(record.localKeys.Len()) < e.maxEvents { - return newEvent, nil + return newEvent, eventKey } // do not grow our local key set any larger than max record.localKeys.PopAny() - // create a new aggregate event + // create a new aggregate event, and return the aggregateKey as the cache key + // (so that it can be overwritten.) eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), @@ -185,7 +202,7 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) Reason: newEvent.Reason, Source: newEvent.Source, } - return eventCopy, nil + return eventCopy, aggregateKey } // eventLog records data about when an event was observed @@ -215,22 +232,22 @@ func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger { return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock} } -// eventObserve records the event, and determines if its frequency should update -func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error) { +// eventObserve records an event, or updates an existing one if key is a cache hit +func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) { var ( patch []byte err error ) - key := getEventKey(newEvent) eventCopy := *newEvent event := &eventCopy e.Lock() defer e.Unlock() + // Check if there is an existing event we should update lastObservation := e.lastEventObservationFromCache(key) - // we have seen this event before, so we must prepare a patch + // If we found a result, prepare a patch if lastObservation.count > 0 { // update the event based on the last observation so patch will work as desired event.Name = lastObservation.name @@ -241,6 +258,7 @@ func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error eventCopy2 := *event eventCopy2.Count = 0 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0)) + eventCopy2.Message = "" newData, _ := json.Marshal(event) oldData, _ := json.Marshal(eventCopy2) @@ -337,6 +355,7 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator { defaultAggregateMaxEvents, defaultAggregateIntervalInSeconds, clock), + logger: newEventLogger(cacheSize, clock), } } @@ -346,11 +365,8 @@ func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateRes if c.filterFunc(newEvent) { return &EventCorrelateResult{Skip: true}, nil } - aggregateEvent, err := c.aggregator.EventAggregate(newEvent) - if err != nil { - return &EventCorrelateResult{}, err - } - observedEvent, patch, err := c.logger.eventObserve(aggregateEvent) + aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) + observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err } diff --git a/staging/src/k8s.io/client-go/tools/record/events_cache_test.go b/staging/src/k8s.io/client-go/tools/record/events_cache_test.go index 614b214ee49..2fdba3b8ed5 100644 --- a/staging/src/k8s.io/client-go/tools/record/events_cache_test.go +++ b/staging/src/k8s.io/client-go/tools/record/events_cache_test.go @@ -157,10 +157,11 @@ func TestEventAggregatorByReasonFunc(t *testing.T) { // TestEventAggregatorByReasonMessageFunc validates the proper output for an aggregate message func TestEventAggregatorByReasonMessageFunc(t *testing.T) { - expected := "(events with common reason combined)" + expectedPrefix := "(combined from similar events): " event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other")) - if actual := EventAggregatorByReasonMessageFunc(&event1); expected != actual { - t.Errorf("Expected %v got %v", expected, actual) + actual := EventAggregatorByReasonMessageFunc(&event1) + if !strings.HasPrefix(actual, expectedPrefix) { + t.Errorf("Expected %v to begin with prefix %v", actual, expectedPrefix) } }