mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-29 16:36:54 +00:00
Merge pull request #47367 from derekwaynecarr/event-spam
Automatic merge from submit-queue Add client side event spam filtering **What this PR does / why we need it**: Add client side event spam filtering to stop excessive traffic to api-server from internal cluster components. this pr defines a per source+object event budget of 25 burst with refill of 1 every 5 minutes. i tested this pr on the following scenarios: **Scenario 1: Node with 50 crash-looping pods** ``` $ create 50 crash-looping pods on a single node $ kubectl run bad --image=busybox --replicas=50 --command -- derekisbad ``` Before: * POST events with peak of 1.7 per second, long-tail: 0.2 per second * PATCH events with peak of 5 per second, long-tail: 5 per second After: * POST events with peak of 1.7 per second, long-tail: 0.2 per second * PATCH events with peak of 3.6 per second, long-tail: 0.2 per second Observation: * https://github.com/kubernetes/kubernetes/pull/47462 capped the number of total events in the long-tail as expected, but did nothing to improve total spam of master. **Scenario 2: replication controller limited by quota** ``` $ kubectl create quota my-quota --hard=pods=1 $ kubectl run nginx --image=nginx --replicas=50 ``` Before: * POST events not relevant as aggregation worked well here. * PATCH events with peak and long-tail of 13.6 per second After: * POST events not relevant as aggregation worked well here. * PATCH events with peak: .35 per second, and long-tail of 0 **Which issue this PR fixes** fixes https://github.com/kubernetes/kubernetes/issues/47366 **Special notes for your reviewer**: this was a significant problem in a kube 1.5 cluster we are running where events were co-located in a single etcd. this cluster was normal to have larger numbers of unhealty pods as well as denial by quota. **Release note**: ```release-note add support for client-side spam filtering of events ``` Kubernetes-commit: 870406bec5b6e071c3854298bd357629b2939f7c
This commit is contained in:
commit
bac0212263
50
Godeps/Godeps.json
generated
50
Godeps/Godeps.json
generated
@ -372,103 +372,103 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/admissionregistration/v1alpha1",
|
"ImportPath": "k8s.io/api/admissionregistration/v1alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/apps/v1beta1",
|
"ImportPath": "k8s.io/api/apps/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/apps/v1beta2",
|
"ImportPath": "k8s.io/api/apps/v1beta2",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/authentication/v1",
|
"ImportPath": "k8s.io/api/authentication/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/authentication/v1beta1",
|
"ImportPath": "k8s.io/api/authentication/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/authorization/v1",
|
"ImportPath": "k8s.io/api/authorization/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/authorization/v1beta1",
|
"ImportPath": "k8s.io/api/authorization/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/autoscaling/v1",
|
"ImportPath": "k8s.io/api/autoscaling/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/autoscaling/v2alpha1",
|
"ImportPath": "k8s.io/api/autoscaling/v2alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/batch/v1",
|
"ImportPath": "k8s.io/api/batch/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/batch/v1beta1",
|
"ImportPath": "k8s.io/api/batch/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/batch/v2alpha1",
|
"ImportPath": "k8s.io/api/batch/v2alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/certificates/v1beta1",
|
"ImportPath": "k8s.io/api/certificates/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/core/v1",
|
"ImportPath": "k8s.io/api/core/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/extensions/v1beta1",
|
"ImportPath": "k8s.io/api/extensions/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/imagepolicy/v1alpha1",
|
"ImportPath": "k8s.io/api/imagepolicy/v1alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/networking/v1",
|
"ImportPath": "k8s.io/api/networking/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/policy/v1beta1",
|
"ImportPath": "k8s.io/api/policy/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/rbac/v1",
|
"ImportPath": "k8s.io/api/rbac/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/rbac/v1alpha1",
|
"ImportPath": "k8s.io/api/rbac/v1alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/rbac/v1beta1",
|
"ImportPath": "k8s.io/api/rbac/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/scheduling/v1alpha1",
|
"ImportPath": "k8s.io/api/scheduling/v1alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/settings/v1alpha1",
|
"ImportPath": "k8s.io/api/settings/v1alpha1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/storage/v1",
|
"ImportPath": "k8s.io/api/storage/v1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/api/storage/v1beta1",
|
"ImportPath": "k8s.io/api/storage/v1beta1",
|
||||||
"Rev": "2ee411bc5b2481d94ab1ce278ba13d3565927efb"
|
"Rev": "52f36f1d09e9eaf72bdaa3b480e462edc9c4d895"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/apimachinery/pkg/api/equality",
|
"ImportPath": "k8s.io/apimachinery/pkg/api/equality",
|
||||||
|
@ -49,6 +49,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/tools/reference:go_default_library",
|
"//vendor/k8s.io/client-go/tools/reference:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -412,7 +412,8 @@ func TestWriteEventError(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
eventCorrelator := NewEventCorrelator(clock.RealClock{})
|
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
|
||||||
|
eventCorrelator := NewEventCorrelator(&clock)
|
||||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
for caseName, ent := range table {
|
for caseName, ent := range table {
|
||||||
@ -435,7 +436,8 @@ func TestWriteEventError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateExpiredEvent(t *testing.T) {
|
func TestUpdateExpiredEvent(t *testing.T) {
|
||||||
eventCorrelator := NewEventCorrelator(clock.RealClock{})
|
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
|
||||||
|
eventCorrelator := NewEventCorrelator(&clock)
|
||||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
var createdEvent *v1.Event
|
var createdEvent *v1.Event
|
||||||
@ -497,14 +499,15 @@ func TestLotsOfEvents(t *testing.T) {
|
|||||||
loggerCalled <- struct{}{}
|
loggerCalled <- struct{}{}
|
||||||
})
|
})
|
||||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "eventTest"})
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "eventTest"})
|
||||||
ref := &v1.ObjectReference{
|
|
||||||
Kind: "Pod",
|
|
||||||
Name: "foo",
|
|
||||||
Namespace: "baz",
|
|
||||||
UID: "bar",
|
|
||||||
APIVersion: "version",
|
|
||||||
}
|
|
||||||
for i := 0; i < maxQueuedEvents; i++ {
|
for i := 0; i < maxQueuedEvents; i++ {
|
||||||
|
// we want a unique object to stop spam filtering
|
||||||
|
ref := &v1.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: fmt.Sprintf("foo-%v", i),
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "version",
|
||||||
|
}
|
||||||
// we need to vary the reason to prevent aggregation
|
// we need to vary the reason to prevent aggregation
|
||||||
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
|
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -39,6 +40,13 @@ const (
|
|||||||
// more than 10 times in a 10 minute period, aggregate the event
|
// more than 10 times in a 10 minute period, aggregate the event
|
||||||
defaultAggregateMaxEvents = 10
|
defaultAggregateMaxEvents = 10
|
||||||
defaultAggregateIntervalInSeconds = 600
|
defaultAggregateIntervalInSeconds = 600
|
||||||
|
|
||||||
|
// by default, allow a source to send 25 events about an object
|
||||||
|
// but control the refill rate to 1 new event every 5 minutes
|
||||||
|
// this helps control the long-tail of events for things that are always
|
||||||
|
// unhealthy
|
||||||
|
defaultSpamBurst = 25
|
||||||
|
defaultSpamQPS = 1. / 300.
|
||||||
)
|
)
|
||||||
|
|
||||||
// getEventKey builds unique event key based on source, involvedObject, reason, message
|
// getEventKey builds unique event key based on source, involvedObject, reason, message
|
||||||
@ -59,6 +67,20 @@ func getEventKey(event *v1.Event) string {
|
|||||||
"")
|
"")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getSpamKey builds unique event key based on source, involvedObject
|
||||||
|
func getSpamKey(event *v1.Event) string {
|
||||||
|
return strings.Join([]string{
|
||||||
|
event.Source.Component,
|
||||||
|
event.Source.Host,
|
||||||
|
event.InvolvedObject.Kind,
|
||||||
|
event.InvolvedObject.Namespace,
|
||||||
|
event.InvolvedObject.Name,
|
||||||
|
string(event.InvolvedObject.UID),
|
||||||
|
event.InvolvedObject.APIVersion,
|
||||||
|
},
|
||||||
|
"")
|
||||||
|
}
|
||||||
|
|
||||||
// EventFilterFunc is a function that returns true if the event should be skipped
|
// EventFilterFunc is a function that returns true if the event should be skipped
|
||||||
type EventFilterFunc func(event *v1.Event) bool
|
type EventFilterFunc func(event *v1.Event) bool
|
||||||
|
|
||||||
@ -67,6 +89,69 @@ func DefaultEventFilterFunc(event *v1.Event) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EventSourceObjectSpamFilter is responsible for throttling
|
||||||
|
// the amount of events a source and object can produce.
|
||||||
|
type EventSourceObjectSpamFilter struct {
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
|
// the cache that manages last synced state
|
||||||
|
cache *lru.Cache
|
||||||
|
|
||||||
|
// burst is the amount of events we allow per source + object
|
||||||
|
burst int
|
||||||
|
|
||||||
|
// qps is the refill rate of the token bucket in queries per second
|
||||||
|
qps float32
|
||||||
|
|
||||||
|
// clock is used to allow for testing over a time interval
|
||||||
|
clock clock.Clock
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
|
||||||
|
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
|
||||||
|
return &EventSourceObjectSpamFilter{
|
||||||
|
cache: lru.New(lruCacheSize),
|
||||||
|
burst: burst,
|
||||||
|
qps: qps,
|
||||||
|
clock: clock,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// spamRecord holds data used to perform spam filtering decisions.
|
||||||
|
type spamRecord struct {
|
||||||
|
// rateLimiter controls the rate of events about this object
|
||||||
|
rateLimiter flowcontrol.RateLimiter
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter controls that a given source+object are not exceeding the allowed rate.
|
||||||
|
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
|
||||||
|
var record spamRecord
|
||||||
|
|
||||||
|
// controls our cached information about this event (source+object)
|
||||||
|
eventKey := getSpamKey(event)
|
||||||
|
|
||||||
|
// do we have a record of similar events in our cache?
|
||||||
|
f.Lock()
|
||||||
|
defer f.Unlock()
|
||||||
|
value, found := f.cache.Get(eventKey)
|
||||||
|
if found {
|
||||||
|
record = value.(spamRecord)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify we have a rate limiter for this record
|
||||||
|
if record.rateLimiter == nil {
|
||||||
|
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure we have available rate
|
||||||
|
filter := !record.rateLimiter.TryAccept()
|
||||||
|
|
||||||
|
// update the cache
|
||||||
|
f.cache.Add(eventKey, record)
|
||||||
|
|
||||||
|
return filter
|
||||||
|
}
|
||||||
|
|
||||||
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
|
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
|
||||||
// It returns a tuple of the following:
|
// It returns a tuple of the following:
|
||||||
// aggregateKey - key the identifies the aggregate group to bucket this event
|
// aggregateKey - key the identifies the aggregate group to bucket this event
|
||||||
@ -337,7 +422,6 @@ type EventCorrelateResult struct {
|
|||||||
// prior to interacting with the API server to record the event.
|
// prior to interacting with the API server to record the event.
|
||||||
//
|
//
|
||||||
// The default behavior is as follows:
|
// The default behavior is as follows:
|
||||||
// * No events are filtered from being recorded
|
|
||||||
// * Aggregation is performed if a similar event is recorded 10 times in a
|
// * Aggregation is performed if a similar event is recorded 10 times in a
|
||||||
// in a 10 minute rolling interval. A similar event is an event that varies only by
|
// in a 10 minute rolling interval. A similar event is an event that varies only by
|
||||||
// the Event.Message field. Rather than recording the precise event, aggregation
|
// the Event.Message field. Rather than recording the precise event, aggregation
|
||||||
@ -345,10 +429,13 @@ type EventCorrelateResult struct {
|
|||||||
// the same reason.
|
// the same reason.
|
||||||
// * Events are incrementally counted if the exact same event is encountered multiple
|
// * Events are incrementally counted if the exact same event is encountered multiple
|
||||||
// times.
|
// times.
|
||||||
|
// * A source may burst 25 events about an object, but has a refill rate budget
|
||||||
|
// per object of 1 event every 5 minutes to control long-tail of spam.
|
||||||
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
||||||
cacheSize := maxLruCacheEntries
|
cacheSize := maxLruCacheEntries
|
||||||
|
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
|
||||||
return &EventCorrelator{
|
return &EventCorrelator{
|
||||||
filterFunc: DefaultEventFilterFunc,
|
filterFunc: spamFilter.Filter,
|
||||||
aggregator: NewEventAggregator(
|
aggregator: NewEventAggregator(
|
||||||
cacheSize,
|
cacheSize,
|
||||||
EventAggregatorByReasonFunc,
|
EventAggregatorByReasonFunc,
|
||||||
@ -363,11 +450,14 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
|||||||
|
|
||||||
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
|
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
|
||||||
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
|
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
|
||||||
if c.filterFunc(newEvent) {
|
if newEvent == nil {
|
||||||
return &EventCorrelateResult{Skip: true}, nil
|
return nil, fmt.Errorf("event is nil")
|
||||||
}
|
}
|
||||||
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
|
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
|
||||||
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
|
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
|
||||||
|
if c.filterFunc(observedEvent) {
|
||||||
|
return &EventCorrelateResult{Skip: true}, nil
|
||||||
|
}
|
||||||
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
|
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,6 +181,7 @@ func TestEventCorrelator(t *testing.T) {
|
|||||||
newEvent v1.Event
|
newEvent v1.Event
|
||||||
expectedEvent v1.Event
|
expectedEvent v1.Event
|
||||||
intervalSeconds int
|
intervalSeconds int
|
||||||
|
expectedSkip bool
|
||||||
}{
|
}{
|
||||||
"create-a-single-event": {
|
"create-a-single-event": {
|
||||||
previousEvents: []v1.Event{},
|
previousEvents: []v1.Event{},
|
||||||
@ -198,7 +199,13 @@ func TestEventCorrelator(t *testing.T) {
|
|||||||
previousEvents: makeEvents(defaultAggregateMaxEvents, duplicateEvent),
|
previousEvents: makeEvents(defaultAggregateMaxEvents, duplicateEvent),
|
||||||
newEvent: duplicateEvent,
|
newEvent: duplicateEvent,
|
||||||
expectedEvent: setCount(duplicateEvent, defaultAggregateMaxEvents+1),
|
expectedEvent: setCount(duplicateEvent, defaultAggregateMaxEvents+1),
|
||||||
intervalSeconds: 5,
|
intervalSeconds: 30, // larger interval induces aggregation but not spam.
|
||||||
|
},
|
||||||
|
"the-same-event-is-spam-if-happens-too-frequently": {
|
||||||
|
previousEvents: makeEvents(defaultSpamBurst+1, duplicateEvent),
|
||||||
|
newEvent: duplicateEvent,
|
||||||
|
expectedSkip: true,
|
||||||
|
intervalSeconds: 1,
|
||||||
},
|
},
|
||||||
"create-many-unique-events": {
|
"create-many-unique-events": {
|
||||||
previousEvents: makeUniqueEvents(30),
|
previousEvents: makeUniqueEvents(30),
|
||||||
@ -245,7 +252,10 @@ func TestEventCorrelator(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("scenario %v: unexpected error playing back prevEvents %v", testScenario, err)
|
t.Errorf("scenario %v: unexpected error playing back prevEvents %v", testScenario, err)
|
||||||
}
|
}
|
||||||
correlator.UpdateState(result.Event)
|
// if we are skipping the event, we can avoid updating state
|
||||||
|
if !result.Skip {
|
||||||
|
correlator.UpdateState(result.Event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the input to current clock value
|
// update the input to current clock value
|
||||||
@ -257,6 +267,18 @@ func TestEventCorrelator(t *testing.T) {
|
|||||||
t.Errorf("scenario %v: unexpected error correlating input event %v", testScenario, err)
|
t.Errorf("scenario %v: unexpected error correlating input event %v", testScenario, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// verify we did not get skip from filter function unexpectedly...
|
||||||
|
if result.Skip != testInput.expectedSkip {
|
||||||
|
t.Errorf("scenario %v: expected skip %v, but got %v", testScenario, testInput.expectedSkip, result.Skip)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// we wanted to actually skip, so no event is needed to validate
|
||||||
|
if testInput.expectedSkip {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate event
|
||||||
_, err = validateEvent(testScenario, result.Event, &testInput.expectedEvent, t)
|
_, err = validateEvent(testScenario, result.Event, &testInput.expectedEvent, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("scenario %v: unexpected error validating result %v", testScenario, err)
|
t.Errorf("scenario %v: unexpected error validating result %v", testScenario, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user