Added config to EventCorrelator in client-go

Added CorrelatorOptions that contains options to change the
defaults in EventSourceObjectSpamFilter and EventAggregator
in EventCorrelator. Added a eventCorrelator property to the
eventBroadcasterImpl to help with this.

Kubernetes-commit: 9d8e6fb1b9cf2d3fac8139a97334287e33ff911f
This commit is contained in:
vivekbagade 2019-03-07 10:39:49 +01:00 committed by Kubernetes Publisher
parent 1300bc81aa
commit 060f7d3455
2 changed files with 98 additions and 3 deletions

View File

@ -50,6 +50,40 @@ type EventSink interface {
Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}
// CorrelatorOptions allows you to change the default of the EventSourceObjectSpamFilter
// and EventAggregator in EventCorrelator
type CorrelatorOptions struct {
// The lru cache size used for both EventSourceObjectSpamFilter and the EventAggregator
// If not specified (zero value), the default specified in events_cache.go will be picked
// This means that the LRUCacheSize has to be greater than 0.
LRUCacheSize int
// The burst size used by the token bucket rate filtering in EventSourceObjectSpamFilter
// If not specified (zero value), the default specified in events_cache.go will be picked
// This means that the BurstSize has to be greater than 0.
BurstSize int
// The fill rate of the token bucket in queries per second in EventSourceObjectSpamFilter
// If not specified (zero value), the default specified in events_cache.go will be picked
// This means that the QPS has to be greater than 0.
QPS float32
// The func used by the EventAggregator to group event keys for aggregation
// If not specified (zero value), EventAggregatorByReasonFunc will be used
KeyFunc EventAggregatorKeyFunc
// The func used by the EventAggregator to produced aggregated message
// If not specified (zero value), EventAggregatorByReasonMessageFunc will be used
MessageFunc EventAggregatorMessageFunc
// The number of events in an interval before aggregation happens by the EventAggregator
// If not specified (zero value), the default specified in events_cache.go will be picked
// This means that the MaxEvents has to be greater than 0
MaxEvents int
// The amount of time in seconds that must transpire since the last occurrence of a similar event before it is considered new by the EventAggregator
// If not specified (zero value), the default specified in events_cache.go will be picked
// This means that the MaxIntervalInSeconds has to be greater than 0
MaxIntervalInSeconds int
// The clock used by the EventAggregator to allow for testing
// If not specified (zero value), clock.RealClock{} will be used
Clock clock.Clock
}
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
@ -97,16 +131,31 @@ type EventBroadcaster interface {
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
}
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: sleepDuration,
}
}
func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
options: options,
}
}
type eventBroadcasterImpl struct {
*watch.Broadcaster
sleepDuration time.Duration
options CorrelatorOptions
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
@ -116,7 +165,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options)
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)

View File

@ -443,6 +443,52 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
}
}
func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
optionsWithDefaults := populateDefaults(options)
spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize,
optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock)
return &EventCorrelator{
filterFunc: spamFilter.Filter,
aggregator: NewEventAggregator(
optionsWithDefaults.LRUCacheSize,
optionsWithDefaults.KeyFunc,
optionsWithDefaults.MessageFunc,
optionsWithDefaults.MaxEvents,
optionsWithDefaults.MaxIntervalInSeconds,
optionsWithDefaults.Clock),
logger: newEventLogger(optionsWithDefaults.LRUCacheSize, optionsWithDefaults.Clock),
}
}
// populateDefaults populates the zero value options with defaults
func populateDefaults(options CorrelatorOptions) CorrelatorOptions {
if options.LRUCacheSize == 0 {
options.LRUCacheSize = maxLruCacheEntries
}
if options.BurstSize == 0 {
options.BurstSize = defaultSpamBurst
}
if options.QPS == 0 {
options.QPS = defaultSpamQPS
}
if options.KeyFunc == nil {
options.KeyFunc = EventAggregatorByReasonFunc
}
if options.MessageFunc == nil {
options.MessageFunc = EventAggregatorByReasonMessageFunc
}
if options.MaxEvents == 0 {
options.MaxEvents = defaultAggregateMaxEvents
}
if options.MaxIntervalInSeconds == 0 {
options.MaxIntervalInSeconds = defaultAggregateIntervalInSeconds
}
if options.Clock == nil {
options.Clock = clock.RealClock{}
}
return options
}
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {