mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Allow customizing spam filtering in event client library
This commit is contained in:
parent
ee5df7cbcf
commit
00080d400f
@ -82,6 +82,9 @@ type CorrelatorOptions struct {
|
|||||||
// The clock used by the EventAggregator to allow for testing
|
// The clock used by the EventAggregator to allow for testing
|
||||||
// If not specified (zero value), clock.RealClock{} will be used
|
// If not specified (zero value), clock.RealClock{} will be used
|
||||||
Clock clock.Clock
|
Clock clock.Clock
|
||||||
|
// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
|
||||||
|
// If not specified (zero value), getSpamKey will be used
|
||||||
|
SpamKeyFunc EventSpamKeyFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||||
|
@ -81,6 +81,9 @@ func getSpamKey(event *v1.Event) string {
|
|||||||
"")
|
"")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EventSpamKeyFunc is a function that returns unique key based on provided event
|
||||||
|
type EventSpamKeyFunc func(event *v1.Event) string
|
||||||
|
|
||||||
// EventFilterFunc is a function that returns true if the event should be skipped
|
// EventFilterFunc is a function that returns true if the event should be skipped
|
||||||
type EventFilterFunc func(event *v1.Event) bool
|
type EventFilterFunc func(event *v1.Event) bool
|
||||||
|
|
||||||
@ -100,15 +103,19 @@ type EventSourceObjectSpamFilter struct {
|
|||||||
|
|
||||||
// clock is used to allow for testing over a time interval
|
// clock is used to allow for testing over a time interval
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
|
|
||||||
|
// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
|
||||||
|
spamKeyFunc EventSpamKeyFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
|
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
|
||||||
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
|
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
|
||||||
return &EventSourceObjectSpamFilter{
|
return &EventSourceObjectSpamFilter{
|
||||||
cache: lru.New(lruCacheSize),
|
cache: lru.New(lruCacheSize),
|
||||||
burst: burst,
|
burst: burst,
|
||||||
qps: qps,
|
qps: qps,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
|
spamKeyFunc: spamKeyFunc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,8 +129,8 @@ type spamRecord struct {
|
|||||||
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
|
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
|
||||||
var record spamRecord
|
var record spamRecord
|
||||||
|
|
||||||
// controls our cached information about this event (source+object)
|
// controls our cached information about this event
|
||||||
eventKey := getSpamKey(event)
|
eventKey := f.spamKeyFunc(event)
|
||||||
|
|
||||||
// do we have a record of similar events in our cache?
|
// do we have a record of similar events in our cache?
|
||||||
f.Lock()
|
f.Lock()
|
||||||
@ -431,7 +438,7 @@ type EventCorrelateResult struct {
|
|||||||
// per object of 1 event every 5 minutes to control long-tail of spam.
|
// per object of 1 event every 5 minutes to control long-tail of spam.
|
||||||
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
||||||
cacheSize := maxLruCacheEntries
|
cacheSize := maxLruCacheEntries
|
||||||
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
|
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
|
||||||
return &EventCorrelator{
|
return &EventCorrelator{
|
||||||
filterFunc: spamFilter.Filter,
|
filterFunc: spamFilter.Filter,
|
||||||
aggregator: NewEventAggregator(
|
aggregator: NewEventAggregator(
|
||||||
@ -448,8 +455,12 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
|||||||
|
|
||||||
func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
|
func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
|
||||||
optionsWithDefaults := populateDefaults(options)
|
optionsWithDefaults := populateDefaults(options)
|
||||||
spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize,
|
spamFilter := NewEventSourceObjectSpamFilter(
|
||||||
optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock)
|
optionsWithDefaults.LRUCacheSize,
|
||||||
|
optionsWithDefaults.BurstSize,
|
||||||
|
optionsWithDefaults.QPS,
|
||||||
|
optionsWithDefaults.Clock,
|
||||||
|
optionsWithDefaults.SpamKeyFunc)
|
||||||
return &EventCorrelator{
|
return &EventCorrelator{
|
||||||
filterFunc: spamFilter.Filter,
|
filterFunc: spamFilter.Filter,
|
||||||
aggregator: NewEventAggregator(
|
aggregator: NewEventAggregator(
|
||||||
@ -489,6 +500,9 @@ func populateDefaults(options CorrelatorOptions) CorrelatorOptions {
|
|||||||
if options.Clock == nil {
|
if options.Clock == nil {
|
||||||
options.Clock = clock.RealClock{}
|
options.Clock = clock.RealClock{}
|
||||||
}
|
}
|
||||||
|
if options.SpamKeyFunc == nil {
|
||||||
|
options.SpamKeyFunc = getSpamKey
|
||||||
|
}
|
||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,3 +278,86 @@ func TestEventCorrelator(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEventSpamFilter(t *testing.T) {
|
||||||
|
spamKeyFuncBasedOnObjectsAndReason := func(e *v1.Event) string {
|
||||||
|
return strings.Join([]string{
|
||||||
|
e.Source.Component,
|
||||||
|
e.Source.Host,
|
||||||
|
e.InvolvedObject.Kind,
|
||||||
|
e.InvolvedObject.Namespace,
|
||||||
|
e.InvolvedObject.Name,
|
||||||
|
string(e.InvolvedObject.UID),
|
||||||
|
e.InvolvedObject.APIVersion,
|
||||||
|
e.Reason,
|
||||||
|
},
|
||||||
|
"")
|
||||||
|
}
|
||||||
|
burstSize := 1
|
||||||
|
eventInterval := time.Duration(1) * time.Second
|
||||||
|
originalEvent := makeEvent("original", "i am first", makeObjectReference("Pod", "my-pod", "my-ns"))
|
||||||
|
differentReasonEvent := makeEvent("duplicate", "me again", makeObjectReference("Pod", "my-pod", "my-ns"))
|
||||||
|
spamEvent := makeEvent("original", "me again", makeObjectReference("Pod", "my-pod", "my-ns"))
|
||||||
|
testCases := map[string]struct {
|
||||||
|
newEvent v1.Event
|
||||||
|
expectedEvent v1.Event
|
||||||
|
expectedSkip bool
|
||||||
|
spamKeyFunc EventSpamKeyFunc
|
||||||
|
}{
|
||||||
|
"event should be reported as spam if object reference is the same for default spam filter": {
|
||||||
|
newEvent: differentReasonEvent,
|
||||||
|
expectedSkip: true,
|
||||||
|
},
|
||||||
|
"event should not be reported as spam if object reference is the same, but reason is different for custom spam filter": {
|
||||||
|
newEvent: differentReasonEvent,
|
||||||
|
expectedEvent: differentReasonEvent,
|
||||||
|
expectedSkip: false,
|
||||||
|
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
|
||||||
|
},
|
||||||
|
"event should be reported as spam if object reference and reason is the same, but message is different for custom spam filter": {
|
||||||
|
newEvent: spamEvent,
|
||||||
|
expectedSkip: true,
|
||||||
|
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for testDescription, testInput := range testCases {
|
||||||
|
|
||||||
|
c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
|
||||||
|
correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
|
||||||
|
Clock: &c,
|
||||||
|
SpamKeyFunc: testInput.spamKeyFunc,
|
||||||
|
BurstSize: burstSize,
|
||||||
|
})
|
||||||
|
// emitting original event
|
||||||
|
result, err := correlator.EventCorrelate(&originalEvent)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("scenario %v: unexpected error correlating originalEvent %v", testDescription, err)
|
||||||
|
}
|
||||||
|
// if we are skipping the event, we can avoid updating state
|
||||||
|
if !result.Skip {
|
||||||
|
correlator.UpdateState(result.Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err = correlator.EventCorrelate(&testInput.newEvent)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("scenario %v: unexpected error correlating input event %v", testDescription, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify we did not get skip from filter function unexpectedly...
|
||||||
|
if result.Skip != testInput.expectedSkip {
|
||||||
|
t.Errorf("scenario %v: expected skip %v, but got %v", testDescription, testInput.expectedSkip, result.Skip)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// we wanted to actually skip, so no event is needed to validate
|
||||||
|
if testInput.expectedSkip {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate event
|
||||||
|
_, err = validateEvent(testDescription, result.Event, &testInput.expectedEvent, t)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("scenario %v: unexpected error validating result %v", testDescription, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user