From 00080d400fe27f11795b654ad1e124311831a0fc Mon Sep 17 00:00:00 2001 From: Aleksandra Gacek Date: Mon, 26 Jul 2021 16:05:18 +0200 Subject: [PATCH] Allow customizing spam filtering in event client library --- .../k8s.io/client-go/tools/record/event.go | 3 + .../client-go/tools/record/events_cache.go | 34 +++++--- .../tools/record/events_cache_test.go | 83 +++++++++++++++++++ 3 files changed, 110 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/record/event.go b/staging/src/k8s.io/client-go/tools/record/event.go index 30a66601989..85690d6fadd 100644 --- a/staging/src/k8s.io/client-go/tools/record/event.go +++ b/staging/src/k8s.io/client-go/tools/record/event.go @@ -82,6 +82,9 @@ type CorrelatorOptions struct { // The clock used by the EventAggregator to allow for testing // If not specified (zero value), clock.RealClock{} will be used Clock clock.Clock + // The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place + // If not specified (zero value), getSpamKey will be used + SpamKeyFunc EventSpamKeyFunc } // EventRecorder knows how to record events on behalf of an EventSource. diff --git a/staging/src/k8s.io/client-go/tools/record/events_cache.go b/staging/src/k8s.io/client-go/tools/record/events_cache.go index 329c7ce5753..50bb9e061f5 100644 --- a/staging/src/k8s.io/client-go/tools/record/events_cache.go +++ b/staging/src/k8s.io/client-go/tools/record/events_cache.go @@ -81,6 +81,9 @@ func getSpamKey(event *v1.Event) string { "") } +// EventSpamKeyFunc is a function that returns unique key based on provided event +type EventSpamKeyFunc func(event *v1.Event) string + // EventFilterFunc is a function that returns true if the event should be skipped type EventFilterFunc func(event *v1.Event) bool @@ -100,15 +103,19 @@ type EventSourceObjectSpamFilter struct { // clock is used to allow for testing over a time interval clock clock.Clock + + // spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events. + spamKeyFunc EventSpamKeyFunc } // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill. -func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter { +func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter { return &EventSourceObjectSpamFilter{ - cache: lru.New(lruCacheSize), - burst: burst, - qps: qps, - clock: clock, + cache: lru.New(lruCacheSize), + burst: burst, + qps: qps, + clock: clock, + spamKeyFunc: spamKeyFunc, } } @@ -122,8 +129,8 @@ type spamRecord struct { func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { var record spamRecord - // controls our cached information about this event (source+object) - eventKey := getSpamKey(event) + // controls our cached information about this event + eventKey := f.spamKeyFunc(event) // do we have a record of similar events in our cache? f.Lock() @@ -431,7 +438,7 @@ type EventCorrelateResult struct { // per object of 1 event every 5 minutes to control long-tail of spam. func NewEventCorrelator(clock clock.Clock) *EventCorrelator { cacheSize := maxLruCacheEntries - spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock) + spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey) return &EventCorrelator{ filterFunc: spamFilter.Filter, aggregator: NewEventAggregator( @@ -448,8 +455,12 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator { func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator { optionsWithDefaults := populateDefaults(options) - spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize, - optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock) + spamFilter := NewEventSourceObjectSpamFilter( + optionsWithDefaults.LRUCacheSize, + optionsWithDefaults.BurstSize, + optionsWithDefaults.QPS, + optionsWithDefaults.Clock, + optionsWithDefaults.SpamKeyFunc) return &EventCorrelator{ filterFunc: spamFilter.Filter, aggregator: NewEventAggregator( @@ -489,6 +500,9 @@ func populateDefaults(options CorrelatorOptions) CorrelatorOptions { if options.Clock == nil { options.Clock = clock.RealClock{} } + if options.SpamKeyFunc == nil { + options.SpamKeyFunc = getSpamKey + } return options } diff --git a/staging/src/k8s.io/client-go/tools/record/events_cache_test.go b/staging/src/k8s.io/client-go/tools/record/events_cache_test.go index 8ac8dbfc087..dcb2a7f3c14 100644 --- a/staging/src/k8s.io/client-go/tools/record/events_cache_test.go +++ b/staging/src/k8s.io/client-go/tools/record/events_cache_test.go @@ -278,3 +278,86 @@ func TestEventCorrelator(t *testing.T) { } } } + +func TestEventSpamFilter(t *testing.T) { + spamKeyFuncBasedOnObjectsAndReason := func(e *v1.Event) string { + return strings.Join([]string{ + e.Source.Component, + e.Source.Host, + e.InvolvedObject.Kind, + e.InvolvedObject.Namespace, + e.InvolvedObject.Name, + string(e.InvolvedObject.UID), + e.InvolvedObject.APIVersion, + e.Reason, + }, + "") + } + burstSize := 1 + eventInterval := time.Duration(1) * time.Second + originalEvent := makeEvent("original", "i am first", makeObjectReference("Pod", "my-pod", "my-ns")) + differentReasonEvent := makeEvent("duplicate", "me again", makeObjectReference("Pod", "my-pod", "my-ns")) + spamEvent := makeEvent("original", "me again", makeObjectReference("Pod", "my-pod", "my-ns")) + testCases := map[string]struct { + newEvent v1.Event + expectedEvent v1.Event + expectedSkip bool + spamKeyFunc EventSpamKeyFunc + }{ + "event should be reported as spam if object reference is the same for default spam filter": { + newEvent: differentReasonEvent, + expectedSkip: true, + }, + "event should not be reported as spam if object reference is the same, but reason is different for custom spam filter": { + newEvent: differentReasonEvent, + expectedEvent: differentReasonEvent, + expectedSkip: false, + spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason, + }, + "event should be reported as spam if object reference and reason is the same, but message is different for custom spam filter": { + newEvent: spamEvent, + expectedSkip: true, + spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason, + }, + } + for testDescription, testInput := range testCases { + + c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval} + correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{ + Clock: &c, + SpamKeyFunc: testInput.spamKeyFunc, + BurstSize: burstSize, + }) + // emitting original event + result, err := correlator.EventCorrelate(&originalEvent) + if err != nil { + t.Errorf("scenario %v: unexpected error correlating originalEvent %v", testDescription, err) + } + // if we are skipping the event, we can avoid updating state + if !result.Skip { + correlator.UpdateState(result.Event) + } + + result, err = correlator.EventCorrelate(&testInput.newEvent) + if err != nil { + t.Errorf("scenario %v: unexpected error correlating input event %v", testDescription, err) + } + + // verify we did not get skip from filter function unexpectedly... + if result.Skip != testInput.expectedSkip { + t.Errorf("scenario %v: expected skip %v, but got %v", testDescription, testInput.expectedSkip, result.Skip) + continue + } + + // we wanted to actually skip, so no event is needed to validate + if testInput.expectedSkip { + continue + } + + // validate event + _, err = validateEvent(testDescription, result.Event, &testInput.expectedEvent, t) + if err != nil { + t.Errorf("scenario %v: unexpected error validating result %v", testDescription, err) + } + } +}