mirror of
https://github.com/kubernetes/client-go.git
synced 2026-01-24 22:15:18 +00:00
This changes the event aggregation behavior so that, when multiple events are deduplicated, the aggregated event includes the message of the latest related event. This fixes an issue where the original event expires due to TTL, and the aggregate event doesn't contain any useful message. Kubernetes-commit: 6ada269d8f199b02dd5852f1545746a14f58f4eb
377 lines
12 KiB
Go
377 lines
12 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package record
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/groupcache/lru"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
|
"k8s.io/client-go/pkg/api/v1"
|
|
)
|
|
|
|
const (
|
|
maxLruCacheEntries = 4096
|
|
|
|
// if we see the same event that varies only by message
|
|
// more than 10 times in a 10 minute period, aggregate the event
|
|
defaultAggregateMaxEvents = 10
|
|
defaultAggregateIntervalInSeconds = 600
|
|
)
|
|
|
|
// getEventKey builds unique event key based on source, involvedObject, reason, message
|
|
func getEventKey(event *v1.Event) string {
|
|
return strings.Join([]string{
|
|
event.Source.Component,
|
|
event.Source.Host,
|
|
event.InvolvedObject.Kind,
|
|
event.InvolvedObject.Namespace,
|
|
event.InvolvedObject.Name,
|
|
string(event.InvolvedObject.UID),
|
|
event.InvolvedObject.APIVersion,
|
|
event.Type,
|
|
event.Reason,
|
|
event.Message,
|
|
},
|
|
"")
|
|
}
|
|
|
|
// EventFilterFunc is a function that returns true if the event should be skipped
|
|
type EventFilterFunc func(event *v1.Event) bool
|
|
|
|
// DefaultEventFilterFunc returns false for all incoming events
|
|
func DefaultEventFilterFunc(event *v1.Event) bool {
|
|
return false
|
|
}
|
|
|
|
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
|
|
// It returns a tuple of the following:
|
|
// aggregateKey - key the identifies the aggregate group to bucket this event
|
|
// localKey - key that makes this event in the local group
|
|
type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
|
|
|
|
// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
|
|
func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
|
|
return strings.Join([]string{
|
|
event.Source.Component,
|
|
event.Source.Host,
|
|
event.InvolvedObject.Kind,
|
|
event.InvolvedObject.Namespace,
|
|
event.InvolvedObject.Name,
|
|
string(event.InvolvedObject.UID),
|
|
event.InvolvedObject.APIVersion,
|
|
event.Type,
|
|
event.Reason,
|
|
},
|
|
""), event.Message
|
|
}
|
|
|
|
// EventAggregatorMessageFunc is responsible for producing an aggregation message
|
|
type EventAggregatorMessageFunc func(event *v1.Event) string
|
|
|
|
// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
|
|
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
|
|
return "(combined from similar events): " + event.Message
|
|
}
|
|
|
|
// EventAggregator identifies similar events and aggregates them into a single event
|
|
type EventAggregator struct {
|
|
sync.RWMutex
|
|
|
|
// The cache that manages aggregation state
|
|
cache *lru.Cache
|
|
|
|
// The function that groups events for aggregation
|
|
keyFunc EventAggregatorKeyFunc
|
|
|
|
// The function that generates a message for an aggregate event
|
|
messageFunc EventAggregatorMessageFunc
|
|
|
|
// The maximum number of events in the specified interval before aggregation occurs
|
|
maxEvents uint
|
|
|
|
// The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
|
|
maxIntervalInSeconds uint
|
|
|
|
// clock is used to allow for testing over a time interval
|
|
clock clock.Clock
|
|
}
|
|
|
|
// NewEventAggregator returns a new instance of an EventAggregator
|
|
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
|
|
maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
|
|
return &EventAggregator{
|
|
cache: lru.New(lruCacheSize),
|
|
keyFunc: keyFunc,
|
|
messageFunc: messageFunc,
|
|
maxEvents: uint(maxEvents),
|
|
maxIntervalInSeconds: uint(maxIntervalInSeconds),
|
|
clock: clock,
|
|
}
|
|
}
|
|
|
|
// aggregateRecord holds data used to perform aggregation decisions
|
|
type aggregateRecord struct {
|
|
// we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
|
|
// if the size of this set exceeds the max, we know we need to aggregate
|
|
localKeys sets.String
|
|
// The last time at which the aggregate was recorded
|
|
lastTimestamp metav1.Time
|
|
}
|
|
|
|
// EventAggregate checks if a similar event has been seen according to the
|
|
// aggregation configuration (max events, max interval, etc) and returns:
|
|
//
|
|
// - The (potentially modified) event that should be created
|
|
// - The cache key for the event, for correlation purposes. This will be set to
|
|
// the full key for normal events, and to the result of
|
|
// EventAggregatorMessageFunc for aggregate events.
|
|
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
|
|
now := metav1.NewTime(e.clock.Now())
|
|
var record aggregateRecord
|
|
// eventKey is the full cache key for this event
|
|
eventKey := getEventKey(newEvent)
|
|
// aggregateKey is for the aggregate event, if one is needed.
|
|
aggregateKey, localKey := e.keyFunc(newEvent)
|
|
|
|
// Do we have a record of similar events in our cache?
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
value, found := e.cache.Get(aggregateKey)
|
|
if found {
|
|
record = value.(aggregateRecord)
|
|
}
|
|
|
|
// Is the previous record too old? If so, make a fresh one. Note: if we didn't
|
|
// find a similar record, its lastTimestamp will be the zero value, so we
|
|
// create a new one in that case.
|
|
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
|
|
interval := now.Time.Sub(record.lastTimestamp.Time)
|
|
if interval > maxInterval {
|
|
record = aggregateRecord{localKeys: sets.NewString()}
|
|
}
|
|
|
|
// Write the new event into the aggregation record and put it on the cache
|
|
record.localKeys.Insert(localKey)
|
|
record.lastTimestamp = now
|
|
e.cache.Add(aggregateKey, record)
|
|
|
|
// If we are not yet over the threshold for unique events, don't correlate them
|
|
if uint(record.localKeys.Len()) < e.maxEvents {
|
|
return newEvent, eventKey
|
|
}
|
|
|
|
// do not grow our local key set any larger than max
|
|
record.localKeys.PopAny()
|
|
|
|
// create a new aggregate event, and return the aggregateKey as the cache key
|
|
// (so that it can be overwritten.)
|
|
eventCopy := &v1.Event{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
|
|
Namespace: newEvent.Namespace,
|
|
},
|
|
Count: 1,
|
|
FirstTimestamp: now,
|
|
InvolvedObject: newEvent.InvolvedObject,
|
|
LastTimestamp: now,
|
|
Message: e.messageFunc(newEvent),
|
|
Type: newEvent.Type,
|
|
Reason: newEvent.Reason,
|
|
Source: newEvent.Source,
|
|
}
|
|
return eventCopy, aggregateKey
|
|
}
|
|
|
|
// eventLog records data about when an event was observed
|
|
type eventLog struct {
|
|
// The number of times the event has occurred since first occurrence.
|
|
count uint
|
|
|
|
// The time at which the event was first recorded.
|
|
firstTimestamp metav1.Time
|
|
|
|
// The unique name of the first occurrence of this event
|
|
name string
|
|
|
|
// Resource version returned from previous interaction with server
|
|
resourceVersion string
|
|
}
|
|
|
|
// eventLogger logs occurrences of an event
|
|
type eventLogger struct {
|
|
sync.RWMutex
|
|
cache *lru.Cache
|
|
clock clock.Clock
|
|
}
|
|
|
|
// newEventLogger observes events and counts their frequencies
|
|
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
|
|
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
|
|
}
|
|
|
|
// eventObserve records an event, or updates an existing one if key is a cache hit
|
|
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
|
|
var (
|
|
patch []byte
|
|
err error
|
|
)
|
|
eventCopy := *newEvent
|
|
event := &eventCopy
|
|
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
|
|
// Check if there is an existing event we should update
|
|
lastObservation := e.lastEventObservationFromCache(key)
|
|
|
|
// If we found a result, prepare a patch
|
|
if lastObservation.count > 0 {
|
|
// update the event based on the last observation so patch will work as desired
|
|
event.Name = lastObservation.name
|
|
event.ResourceVersion = lastObservation.resourceVersion
|
|
event.FirstTimestamp = lastObservation.firstTimestamp
|
|
event.Count = int32(lastObservation.count) + 1
|
|
|
|
eventCopy2 := *event
|
|
eventCopy2.Count = 0
|
|
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
|
|
eventCopy2.Message = ""
|
|
|
|
newData, _ := json.Marshal(event)
|
|
oldData, _ := json.Marshal(eventCopy2)
|
|
patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
|
|
}
|
|
|
|
// record our new observation
|
|
e.cache.Add(
|
|
key,
|
|
eventLog{
|
|
count: uint(event.Count),
|
|
firstTimestamp: event.FirstTimestamp,
|
|
name: event.Name,
|
|
resourceVersion: event.ResourceVersion,
|
|
},
|
|
)
|
|
return event, patch, err
|
|
}
|
|
|
|
// updateState updates its internal tracking information based on latest server state
|
|
func (e *eventLogger) updateState(event *v1.Event) {
|
|
key := getEventKey(event)
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
// record our new observation
|
|
e.cache.Add(
|
|
key,
|
|
eventLog{
|
|
count: uint(event.Count),
|
|
firstTimestamp: event.FirstTimestamp,
|
|
name: event.Name,
|
|
resourceVersion: event.ResourceVersion,
|
|
},
|
|
)
|
|
}
|
|
|
|
// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
|
|
func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
|
|
value, ok := e.cache.Get(key)
|
|
if ok {
|
|
observationValue, ok := value.(eventLog)
|
|
if ok {
|
|
return observationValue
|
|
}
|
|
}
|
|
return eventLog{}
|
|
}
|
|
|
|
// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
|
|
// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
|
|
// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
|
|
// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
|
|
type EventCorrelator struct {
|
|
// the function to filter the event
|
|
filterFunc EventFilterFunc
|
|
// the object that performs event aggregation
|
|
aggregator *EventAggregator
|
|
// the object that observes events as they come through
|
|
logger *eventLogger
|
|
}
|
|
|
|
// EventCorrelateResult is the result of a Correlate
|
|
type EventCorrelateResult struct {
|
|
// the event after correlation
|
|
Event *v1.Event
|
|
// if provided, perform a strategic patch when updating the record on the server
|
|
Patch []byte
|
|
// if true, do no further processing of the event
|
|
Skip bool
|
|
}
|
|
|
|
// NewEventCorrelator returns an EventCorrelator configured with default values.
|
|
//
|
|
// The EventCorrelator is responsible for event filtering, aggregating, and counting
|
|
// prior to interacting with the API server to record the event.
|
|
//
|
|
// The default behavior is as follows:
|
|
// * No events are filtered from being recorded
|
|
// * Aggregation is performed if a similar event is recorded 10 times in a
|
|
// in a 10 minute rolling interval. A similar event is an event that varies only by
|
|
// the Event.Message field. Rather than recording the precise event, aggregation
|
|
// will create a new event whose message reports that it has combined events with
|
|
// the same reason.
|
|
// * Events are incrementally counted if the exact same event is encountered multiple
|
|
// times.
|
|
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
|
cacheSize := maxLruCacheEntries
|
|
return &EventCorrelator{
|
|
filterFunc: DefaultEventFilterFunc,
|
|
aggregator: NewEventAggregator(
|
|
cacheSize,
|
|
EventAggregatorByReasonFunc,
|
|
EventAggregatorByReasonMessageFunc,
|
|
defaultAggregateMaxEvents,
|
|
defaultAggregateIntervalInSeconds,
|
|
clock),
|
|
|
|
logger: newEventLogger(cacheSize, clock),
|
|
}
|
|
}
|
|
|
|
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
|
|
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
|
|
if c.filterFunc(newEvent) {
|
|
return &EventCorrelateResult{Skip: true}, nil
|
|
}
|
|
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
|
|
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
|
|
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
|
|
}
|
|
|
|
// UpdateState based on the latest observed state from server
|
|
func (c *EventCorrelator) UpdateState(event *v1.Event) {
|
|
c.logger.updateState(event)
|
|
}
|