mirror of
https://github.com/kubernetes/client-go.git
synced 2025-08-30 13:23:28 +00:00
Merge pull request #103918 from olagacek/master
Allow customizing spam filtering in event client library Kubernetes-commit: 582c6f6df71d8c59de5b8b5a2363d13ffcc42b14
This commit is contained in:
commit
19db02e788
4
go.mod
4
go.mod
@ -31,7 +31,7 @@ require (
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
|
||||
google.golang.org/protobuf v1.26.0
|
||||
k8s.io/api v0.0.0-20210809160315-dea726542644
|
||||
k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686
|
||||
k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64
|
||||
k8s.io/klog/v2 v2.9.0
|
||||
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e
|
||||
k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9
|
||||
@ -41,5 +41,5 @@ require (
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20210809160315-dea726542644
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64
|
||||
)
|
||||
|
4
go.sum
4
go.sum
@ -601,8 +601,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
|
||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
k8s.io/api v0.0.0-20210809160315-dea726542644 h1:YRfN12o4Gy+EnHsggzJqLREq5fu+9uLUY1Gqxc6qA1w=
|
||||
k8s.io/api v0.0.0-20210809160315-dea726542644/go.mod h1:f1PcAK6yXpQLNV3CDbnCgD/WzlPgQy+B5Myoibc15MQ=
|
||||
k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686 h1:fNTzCy7loE3knPNEGR9OjR1yXPBiP+3S/7dCkpGYQtw=
|
||||
k8s.io/apimachinery v0.0.0-20210811014858-805eea8c2686/go.mod h1:Rk+pTpcX+fNVsc3nfpy3fDaMhRjZTuFMcjsATCvV6Uk=
|
||||
k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64 h1:nQCb3+Rke2THnETOqf9o9cAj6/hLFj7lj3KvcUp53kg=
|
||||
k8s.io/apimachinery v0.0.0-20210811080139-1af25b613b64/go.mod h1:Rk+pTpcX+fNVsc3nfpy3fDaMhRjZTuFMcjsATCvV6Uk=
|
||||
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
||||
k8s.io/klog/v2 v2.9.0 h1:D7HV+n1V57XeZ0m6tdRkfknthUaM06VFbWldOFh8kzM=
|
||||
|
@ -82,6 +82,9 @@ type CorrelatorOptions struct {
|
||||
// The clock used by the EventAggregator to allow for testing
|
||||
// If not specified (zero value), clock.RealClock{} will be used
|
||||
Clock clock.Clock
|
||||
// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
|
||||
// If not specified (zero value), getSpamKey will be used
|
||||
SpamKeyFunc EventSpamKeyFunc
|
||||
}
|
||||
|
||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||
|
@ -81,6 +81,9 @@ func getSpamKey(event *v1.Event) string {
|
||||
"")
|
||||
}
|
||||
|
||||
// EventSpamKeyFunc is a function that returns unique key based on provided event
|
||||
type EventSpamKeyFunc func(event *v1.Event) string
|
||||
|
||||
// EventFilterFunc is a function that returns true if the event should be skipped
|
||||
type EventFilterFunc func(event *v1.Event) bool
|
||||
|
||||
@ -100,15 +103,19 @@ type EventSourceObjectSpamFilter struct {
|
||||
|
||||
// clock is used to allow for testing over a time interval
|
||||
clock clock.Clock
|
||||
|
||||
// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
|
||||
spamKeyFunc EventSpamKeyFunc
|
||||
}
|
||||
|
||||
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
|
||||
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
|
||||
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
|
||||
return &EventSourceObjectSpamFilter{
|
||||
cache: lru.New(lruCacheSize),
|
||||
burst: burst,
|
||||
qps: qps,
|
||||
clock: clock,
|
||||
cache: lru.New(lruCacheSize),
|
||||
burst: burst,
|
||||
qps: qps,
|
||||
clock: clock,
|
||||
spamKeyFunc: spamKeyFunc,
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,8 +129,8 @@ type spamRecord struct {
|
||||
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
|
||||
var record spamRecord
|
||||
|
||||
// controls our cached information about this event (source+object)
|
||||
eventKey := getSpamKey(event)
|
||||
// controls our cached information about this event
|
||||
eventKey := f.spamKeyFunc(event)
|
||||
|
||||
// do we have a record of similar events in our cache?
|
||||
f.Lock()
|
||||
@ -431,7 +438,7 @@ type EventCorrelateResult struct {
|
||||
// per object of 1 event every 5 minutes to control long-tail of spam.
|
||||
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
||||
cacheSize := maxLruCacheEntries
|
||||
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
|
||||
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
|
||||
return &EventCorrelator{
|
||||
filterFunc: spamFilter.Filter,
|
||||
aggregator: NewEventAggregator(
|
||||
@ -448,8 +455,12 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
||||
|
||||
func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
|
||||
optionsWithDefaults := populateDefaults(options)
|
||||
spamFilter := NewEventSourceObjectSpamFilter(optionsWithDefaults.LRUCacheSize,
|
||||
optionsWithDefaults.BurstSize, optionsWithDefaults.QPS, optionsWithDefaults.Clock)
|
||||
spamFilter := NewEventSourceObjectSpamFilter(
|
||||
optionsWithDefaults.LRUCacheSize,
|
||||
optionsWithDefaults.BurstSize,
|
||||
optionsWithDefaults.QPS,
|
||||
optionsWithDefaults.Clock,
|
||||
optionsWithDefaults.SpamKeyFunc)
|
||||
return &EventCorrelator{
|
||||
filterFunc: spamFilter.Filter,
|
||||
aggregator: NewEventAggregator(
|
||||
@ -489,6 +500,9 @@ func populateDefaults(options CorrelatorOptions) CorrelatorOptions {
|
||||
if options.Clock == nil {
|
||||
options.Clock = clock.RealClock{}
|
||||
}
|
||||
if options.SpamKeyFunc == nil {
|
||||
options.SpamKeyFunc = getSpamKey
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
|
@ -278,3 +278,86 @@ func TestEventCorrelator(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventSpamFilter(t *testing.T) {
|
||||
spamKeyFuncBasedOnObjectsAndReason := func(e *v1.Event) string {
|
||||
return strings.Join([]string{
|
||||
e.Source.Component,
|
||||
e.Source.Host,
|
||||
e.InvolvedObject.Kind,
|
||||
e.InvolvedObject.Namespace,
|
||||
e.InvolvedObject.Name,
|
||||
string(e.InvolvedObject.UID),
|
||||
e.InvolvedObject.APIVersion,
|
||||
e.Reason,
|
||||
},
|
||||
"")
|
||||
}
|
||||
burstSize := 1
|
||||
eventInterval := time.Duration(1) * time.Second
|
||||
originalEvent := makeEvent("original", "i am first", makeObjectReference("Pod", "my-pod", "my-ns"))
|
||||
differentReasonEvent := makeEvent("duplicate", "me again", makeObjectReference("Pod", "my-pod", "my-ns"))
|
||||
spamEvent := makeEvent("original", "me again", makeObjectReference("Pod", "my-pod", "my-ns"))
|
||||
testCases := map[string]struct {
|
||||
newEvent v1.Event
|
||||
expectedEvent v1.Event
|
||||
expectedSkip bool
|
||||
spamKeyFunc EventSpamKeyFunc
|
||||
}{
|
||||
"event should be reported as spam if object reference is the same for default spam filter": {
|
||||
newEvent: differentReasonEvent,
|
||||
expectedSkip: true,
|
||||
},
|
||||
"event should not be reported as spam if object reference is the same, but reason is different for custom spam filter": {
|
||||
newEvent: differentReasonEvent,
|
||||
expectedEvent: differentReasonEvent,
|
||||
expectedSkip: false,
|
||||
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
|
||||
},
|
||||
"event should be reported as spam if object reference and reason is the same, but message is different for custom spam filter": {
|
||||
newEvent: spamEvent,
|
||||
expectedSkip: true,
|
||||
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
|
||||
},
|
||||
}
|
||||
for testDescription, testInput := range testCases {
|
||||
|
||||
c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
|
||||
correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
|
||||
Clock: &c,
|
||||
SpamKeyFunc: testInput.spamKeyFunc,
|
||||
BurstSize: burstSize,
|
||||
})
|
||||
// emitting original event
|
||||
result, err := correlator.EventCorrelate(&originalEvent)
|
||||
if err != nil {
|
||||
t.Errorf("scenario %v: unexpected error correlating originalEvent %v", testDescription, err)
|
||||
}
|
||||
// if we are skipping the event, we can avoid updating state
|
||||
if !result.Skip {
|
||||
correlator.UpdateState(result.Event)
|
||||
}
|
||||
|
||||
result, err = correlator.EventCorrelate(&testInput.newEvent)
|
||||
if err != nil {
|
||||
t.Errorf("scenario %v: unexpected error correlating input event %v", testDescription, err)
|
||||
}
|
||||
|
||||
// verify we did not get skip from filter function unexpectedly...
|
||||
if result.Skip != testInput.expectedSkip {
|
||||
t.Errorf("scenario %v: expected skip %v, but got %v", testDescription, testInput.expectedSkip, result.Skip)
|
||||
continue
|
||||
}
|
||||
|
||||
// we wanted to actually skip, so no event is needed to validate
|
||||
if testInput.expectedSkip {
|
||||
continue
|
||||
}
|
||||
|
||||
// validate event
|
||||
_, err = validateEvent(testDescription, result.Event, &testInput.expectedEvent, t)
|
||||
if err != nil {
|
||||
t.Errorf("scenario %v: unexpected error validating result %v", testDescription, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user