diff --git a/go.mod b/go.mod index 037ae47e..6fa34ff9 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac google.golang.org/protobuf v1.26.0 k8s.io/api v0.0.0-20210809160315-dea726542644 - k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686 + k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64 k8s.io/klog/v2 v2.9.0 k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9 @@ -41,5 +41,5 @@ require ( replace ( k8s.io/api => k8s.io/api v0.0.0-20210809160315-dea726542644 - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686 + k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64 ) diff --git a/go.sum b/go.sum index 38c512b1..b7c04a89 100644 --- a/go.sum +++ b/go.sum @@ -601,8 +601,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.0.0-20210809160315-dea726542644 h1:YRfN12o4Gy+EnHsggzJqLREq5fu+9uLUY1Gqxc6qA1w= k8s.io/api v0.0.0-20210809160315-dea726542644/go.mod h1:f1PcAK6yXpQLNV3CDbnCgD/WzlPgQy+B5Myoibc15MQ= -k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686 h1:fNTzCy7loE3knPNEGR9OjR1yXPBiP+3S/7dCkpGYQtw= -k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686/go.mod h1:Rk+pTpcX+fNVsc3nfpy3fDaMhRjZTuFMcjsATCvV6Uk= +k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64 h1:nQCb3+Rke2THnETOqf9o9cAj6/hLFj7lj3KvcUp53kg= +k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64/go.mod h1:Rk+pTpcX+fNVsc3nfpy3fDaMhRjZTuFMcjsATCvV6Uk= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.9.0 h1:D7HV+n1V57XeZ0m6tdRkfknthUaM06VFbWldOFh8kzM= diff --git a/tools/record/event.go b/tools/record/event.go index 30a66601..85690d6f 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -82,6 +82,9 @@ type CorrelatorOptions struct { // The clock used by the EventAggregator to allow for testing // If not specified (zero value), clock.RealClock{} will be used Clock clock.Clock + // The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place + // If not specified (zero value), getSpamKey will be used + SpamKeyFunc EventSpamKeyFunc } // EventRecorder knows how to record events on behalf of an EventSource. diff --git a/tools/record/events_cache.go b/tools/record/events_cache.go index 329c7ce5..50bb9e06 100644 --- a/tools/record/events_cache.go +++ b/tools/record/events_cache.go @@ -81,6 +81,9 @@ func getSpamKey(event *v1.Event) string { "") } +// EventSpamKeyFunc is a function that returns unique key based on provided event +type EventSpamKeyFunc func(event *v1.Event) string + // EventFilterFunc is a function that returns true if the event should be skipped type EventFilterFunc func(event *v1.Event) bool @@ -100,15 +103,19 @@ type EventSourceObjectSpamFilter struct { // clock is used to allow for testing over a time interval clock clock.Clock + + // spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events. + spamKeyFunc EventSpamKeyFunc } // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill. -func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter { +func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter { return &EventSourceObjectSpamFilter{ - cache: lru.New(lruCacheSize), - burst: burst, - qps: qps, - clock: clock, + cache: lru.New(lruCacheSize), + burst: burst, + qps: qps, + clock: clock, + spamKeyFunc: spamKeyFunc, } } @@ -122,8 +129,8 @@ type spamRecord struct { func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { var record spamRecord - // controls our cached information about this event (source+object) - eventKey := getSpamKey(event) + // controls our cached information about this event + eventKey := f.spamKeyFunc(event) // do we have a record of similar events in our cache? f.Lock() @@ -431,7 +438,7 @@ type EventCorrelateResult struct { // per object of 1 event every 5 minutes to control long-tail of spam. func NewEventCorrelator(clock clock.Clock) *EventCorrelator { cacheSize := maxLruCacheEntries - spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock) + spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey) return &EventCorrelator{ filterFunc: spamFilter.Filter, aggregator: NewEventAggregator( @@ -448,8 +455,12 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator { func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator { optionsWithDefaults := populateDefaults(options) - spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize, - optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock) + spamFilter := NewEventSourceObjectSpamFilter( + optionsWithDefaults.LRUCacheSize, + optionsWithDefaults.BurstSize, + optionsWithDefaults.QPS, + optionsWithDefaults.Clock, + optionsWithDefaults.SpamKeyFunc) return &EventCorrelator{ filterFunc: spamFilter.Filter, aggregator: NewEventAggregator( @@ -489,6 +500,9 @@ func populateDefaults(options CorrelatorOptions) CorrelatorOptions { if options.Clock == nil { options.Clock = clock.RealClock{} } + if options.SpamKeyFunc == nil { + options.SpamKeyFunc = getSpamKey + } return options } diff --git a/tools/record/events_cache_test.go b/tools/record/events_cache_test.go index 8ac8dbfc..dcb2a7f3 100644 --- a/tools/record/events_cache_test.go +++ b/tools/record/events_cache_test.go @@ -278,3 +278,86 @@ func TestEventCorrelator(t *testing.T) { } } } + +func TestEventSpamFilter(t *testing.T) { + spamKeyFuncBasedOnObjectsAndReason := func(e *v1.Event) string { + return strings.Join([]string{ + e.Source.Component, + e.Source.Host, + e.InvolvedObject.Kind, + e.InvolvedObject.Namespace, + e.InvolvedObject.Name, + string(e.InvolvedObject.UID), + e.InvolvedObject.APIVersion, + e.Reason, + }, + "") + } + burstSize := 1 + eventInterval := time.Duration(1) * time.Second + originalEvent := makeEvent("original", "i am first", makeObjectReference("Pod", "my-pod", "my-ns")) + differentReasonEvent := makeEvent("duplicate", "me again", makeObjectReference("Pod", "my-pod", "my-ns")) + spamEvent := makeEvent("original", "me again", makeObjectReference("Pod", "my-pod", "my-ns")) + testCases := map[string]struct { + newEvent v1.Event + expectedEvent v1.Event + expectedSkip bool + spamKeyFunc EventSpamKeyFunc + }{ + "event should be reported as spam if object reference is the same for default spam filter": { + newEvent: differentReasonEvent, + expectedSkip: true, + }, + "event should not be reported as spam if object reference is the same, but reason is different for custom spam filter": { + newEvent: differentReasonEvent, + expectedEvent: differentReasonEvent, + expectedSkip: false, + spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason, + }, + "event should be reported as spam if object reference and reason is the same, but message is different for custom spam filter": { + newEvent: spamEvent, + expectedSkip: true, + spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason, + }, + } + for testDescription, testInput := range testCases { + + c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval} + correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{ + Clock: &c, + SpamKeyFunc: testInput.spamKeyFunc, + BurstSize: burstSize, + }) + // emitting original event + result, err := correlator.EventCorrelate(&originalEvent) + if err != nil { + t.Errorf("scenario %v: unexpected error correlating originalEvent %v", testDescription, err) + } + // if we are skipping the event, we can avoid updating state + if !result.Skip { + correlator.UpdateState(result.Event) + } + + result, err = correlator.EventCorrelate(&testInput.newEvent) + if err != nil { + t.Errorf("scenario %v: unexpected error correlating input event %v", testDescription, err) + } + + // verify we did not get skip from filter function unexpectedly... + if result.Skip != testInput.expectedSkip { + t.Errorf("scenario %v: expected skip %v, but got %v", testDescription, testInput.expectedSkip, result.Skip) + continue + } + + // we wanted to actually skip, so no event is needed to validate + if testInput.expectedSkip { + continue + } + + // validate event + _, err = validateEvent(testDescription, result.Event, &testInput.expectedEvent, t) + if err != nil { + t.Errorf("scenario %v: unexpected error validating result %v", testDescription, err) + } + } +}