mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-21 02:57:47 +00:00
Refactor client-go/util/flowcontrol/throttle.go RateLimiter
- Introduce PassiveRateLimiter which implements all methods of previous RateLimiter except Accept() and Wait() - Change RateLimiter interface to extend PassiveRateLimiter by additionally implementing Accept() and Wait() - Make client-go/tools/record use PassiveRateLimiter Refactor EventSourceObjectSpamFilter, EventAggregator, EventCorrelator - EventSourceObjectSpamFilter, EventAggregator, EventCorrelator use clock.PassiveClock now. - This won't be a breaking change because even if a clock.Clock is passed, it still implements the clock.PassiveClock interface. - Extend clock.PassiveClock through Clock. - Replace pacakge local implementation of realClock with clock.RealClock - In flowcontrol/throttle.go split tokenBucketRateLimiters to use Clock and clock.PassiveClock. - Migrate client-go/tools/record tests from using IntervalClock to using SimpleIntervalClock (honest implementation of clock.PassiveClock) Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com> Kubernetes-commit: ac5c55f0bd853fcf883d9b8e1f5ef728a2fb5309
This commit is contained in:
committed by
Kubernetes Publisher
parent
191e5dc23b
commit
b9fa896d5d
@@ -81,7 +81,7 @@ type CorrelatorOptions struct {
|
||||
MaxIntervalInSeconds int
|
||||
// The clock used by the EventAggregator to allow for testing
|
||||
// If not specified (zero value), clock.RealClock{} will be used
|
||||
Clock clock.Clock
|
||||
Clock clock.PassiveClock
|
||||
// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
|
||||
// If not specified (zero value), getSpamKey will be used
|
||||
SpamKeyFunc EventSpamKeyFunc
|
||||
@@ -323,7 +323,7 @@ type recorderImpl struct {
|
||||
scheme *runtime.Scheme
|
||||
source v1.EventSource
|
||||
*watch.Broadcaster
|
||||
clock clock.Clock
|
||||
clock clock.PassiveClock
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
|
||||
|
@@ -34,6 +34,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
testclocks "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
type testEventSink struct {
|
||||
@@ -438,7 +439,7 @@ func TestWriteEventError(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
|
||||
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
|
||||
eventCorrelator := NewEventCorrelator(&clock)
|
||||
|
||||
for caseName, ent := range table {
|
||||
@@ -461,7 +462,7 @@ func TestWriteEventError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateExpiredEvent(t *testing.T) {
|
||||
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
|
||||
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
|
||||
eventCorrelator := NewEventCorrelator(&clock)
|
||||
|
||||
var createdEvent *v1.Event
|
||||
|
@@ -102,14 +102,14 @@ type EventSourceObjectSpamFilter struct {
|
||||
qps float32
|
||||
|
||||
// clock is used to allow for testing over a time interval
|
||||
clock clock.Clock
|
||||
clock clock.PassiveClock
|
||||
|
||||
// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
|
||||
spamKeyFunc EventSpamKeyFunc
|
||||
}
|
||||
|
||||
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
|
||||
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
|
||||
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
|
||||
return &EventSourceObjectSpamFilter{
|
||||
cache: lru.New(lruCacheSize),
|
||||
burst: burst,
|
||||
@@ -122,7 +122,7 @@ func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock
|
||||
// spamRecord holds data used to perform spam filtering decisions.
|
||||
type spamRecord struct {
|
||||
// rateLimiter controls the rate of events about this object
|
||||
rateLimiter flowcontrol.RateLimiter
|
||||
rateLimiter flowcontrol.PassiveRateLimiter
|
||||
}
|
||||
|
||||
// Filter controls that a given source+object are not exceeding the allowed rate.
|
||||
@@ -142,7 +142,7 @@ func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
|
||||
|
||||
// verify we have a rate limiter for this record
|
||||
if record.rateLimiter == nil {
|
||||
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
|
||||
record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
|
||||
}
|
||||
|
||||
// ensure we have available rate
|
||||
@@ -207,12 +207,12 @@ type EventAggregator struct {
|
||||
maxIntervalInSeconds uint
|
||||
|
||||
// clock is used to allow for testing over a time interval
|
||||
clock clock.Clock
|
||||
clock clock.PassiveClock
|
||||
}
|
||||
|
||||
// NewEventAggregator returns a new instance of an EventAggregator
|
||||
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
|
||||
maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
|
||||
maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
|
||||
return &EventAggregator{
|
||||
cache: lru.New(lruCacheSize),
|
||||
keyFunc: keyFunc,
|
||||
@@ -315,11 +315,11 @@ type eventLog struct {
|
||||
type eventLogger struct {
|
||||
sync.RWMutex
|
||||
cache *lru.Cache
|
||||
clock clock.Clock
|
||||
clock clock.PassiveClock
|
||||
}
|
||||
|
||||
// newEventLogger observes events and counts their frequencies
|
||||
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
|
||||
func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger {
|
||||
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
|
||||
}
|
||||
|
||||
@@ -436,7 +436,7 @@ type EventCorrelateResult struct {
|
||||
// times.
|
||||
// * A source may burst 25 events about an object, but has a refill rate budget
|
||||
// per object of 1 event every 5 minutes to control long-tail of spam.
|
||||
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
|
||||
func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator {
|
||||
cacheSize := maxLruCacheEntries
|
||||
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
|
||||
return &EventCorrelator{
|
||||
|
@@ -25,8 +25,8 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
testclocks "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
func makeObjectReference(kind, name, namespace string) v1.ObjectReference {
|
||||
@@ -234,7 +234,7 @@ func TestEventCorrelator(t *testing.T) {
|
||||
|
||||
for testScenario, testInput := range scenario {
|
||||
eventInterval := time.Duration(testInput.intervalSeconds) * time.Second
|
||||
clock := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
|
||||
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
|
||||
correlator := NewEventCorrelator(&clock)
|
||||
for i := range testInput.previousEvents {
|
||||
event := testInput.previousEvents[i]
|
||||
@@ -320,9 +320,9 @@ func TestEventSpamFilter(t *testing.T) {
|
||||
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
|
||||
},
|
||||
}
|
||||
for testDescription, testInput := range testCases {
|
||||
|
||||
c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
|
||||
for testDescription, testInput := range testCases {
|
||||
c := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
|
||||
correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
|
||||
Clock: &c,
|
||||
SpamKeyFunc: testInput.spamKeyFunc,
|
||||
|
Reference in New Issue
Block a user