Merge pull request #104578 from MadhavJivrajani/refactor-rate-limiters

Move client-go/tools/record tests away from `IntervalClock` to `SimpleIntervalClock`

Kubernetes-commit: 775c9314adb82419aaffea39f671d8dcee133a3f
This commit is contained in:
Kubernetes Publisher 2021-09-20 15:02:24 -07:00
commit 79916c54e1
6 changed files with 89 additions and 51 deletions

View File

@ -81,7 +81,7 @@ type CorrelatorOptions struct {
MaxIntervalInSeconds int
// The clock used by the EventAggregator to allow for testing
// If not specified (zero value), clock.RealClock{} will be used
Clock clock.Clock
Clock clock.PassiveClock
// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
// If not specified (zero value), getSpamKey will be used
SpamKeyFunc EventSpamKeyFunc
@ -323,7 +323,7 @@ type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
clock clock.PassiveClock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
testclocks "k8s.io/utils/clock/testing"
)
type testEventSink struct {
@ -438,7 +439,7 @@ func TestWriteEventError(t *testing.T) {
},
}
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)
for caseName, ent := range table {
@ -461,7 +462,7 @@ func TestWriteEventError(t *testing.T) {
}
func TestUpdateExpiredEvent(t *testing.T) {
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)
var createdEvent *v1.Event

View File

@ -102,14 +102,14 @@ type EventSourceObjectSpamFilter struct {
qps float32
// clock is used to allow for testing over a time interval
clock clock.Clock
clock clock.PassiveClock
// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
spamKeyFunc EventSpamKeyFunc
}
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
return &EventSourceObjectSpamFilter{
cache: lru.New(lruCacheSize),
burst: burst,
@ -122,7 +122,7 @@ func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock
// spamRecord holds data used to perform spam filtering decisions.
type spamRecord struct {
// rateLimiter controls the rate of events about this object
rateLimiter flowcontrol.RateLimiter
rateLimiter flowcontrol.PassiveRateLimiter
}
// Filter controls that a given source+object are not exceeding the allowed rate.
@ -142,7 +142,7 @@ func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
// verify we have a rate limiter for this record
if record.rateLimiter == nil {
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
}
// ensure we have available rate
@ -207,12 +207,12 @@ type EventAggregator struct {
maxIntervalInSeconds uint
// clock is used to allow for testing over a time interval
clock clock.Clock
clock clock.PassiveClock
}
// NewEventAggregator returns a new instance of an EventAggregator
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
return &EventAggregator{
cache: lru.New(lruCacheSize),
keyFunc: keyFunc,
@ -315,11 +315,11 @@ type eventLog struct {
type eventLogger struct {
sync.RWMutex
cache *lru.Cache
clock clock.Clock
clock clock.PassiveClock
}
// newEventLogger observes events and counts their frequencies
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger {
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
}
@ -436,7 +436,7 @@ type EventCorrelateResult struct {
// times.
// * A source may burst 25 events about an object, but has a refill rate budget
// per object of 1 event every 5 minutes to control long-tail of spam.
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator {
cacheSize := maxLruCacheEntries
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
return &EventCorrelator{

View File

@ -25,8 +25,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/diff"
testclocks "k8s.io/utils/clock/testing"
)
func makeObjectReference(kind, name, namespace string) v1.ObjectReference {
@ -234,7 +234,7 @@ func TestEventCorrelator(t *testing.T) {
for testScenario, testInput := range scenario {
eventInterval := time.Duration(testInput.intervalSeconds) * time.Second
clock := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
correlator := NewEventCorrelator(&clock)
for i := range testInput.previousEvents {
event := testInput.previousEvents[i]
@ -320,9 +320,9 @@ func TestEventSpamFilter(t *testing.T) {
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
},
}
for testDescription, testInput := range testCases {
c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
for testDescription, testInput := range testCases {
c := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
Clock: &c,
SpamKeyFunc: testInput.spamKeyFunc,

View File

@ -23,26 +23,36 @@ import (
"time"
"golang.org/x/time/rate"
"k8s.io/utils/clock"
)
type RateLimiter interface {
type PassiveRateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Accept returns once a token becomes available.
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// QPS returns QPS of this rate limiter
QPS() float32
}
type RateLimiter interface {
PassiveRateLimiter
// Accept returns once a token becomes available.
Accept()
// Wait returns nil if a token is taken before the Context is done.
Wait(ctx context.Context) error
}
type tokenBucketRateLimiter struct {
type tokenBucketPassiveRateLimiter struct {
limiter *rate.Limiter
clock Clock
qps float32
clock clock.PassiveClock
}
type tokenBucketRateLimiter struct {
tokenBucketPassiveRateLimiter
clock Clock
}
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
@ -52,58 +62,73 @@ type tokenBucketRateLimiter struct {
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
}
// NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns
// a PassiveRateLimiter which does not have Accept() and Wait() methods.
func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
}
// An injectable, mockable clock interface.
type Clock interface {
Now() time.Time
clock.PassiveClock
Sleep(time.Duration)
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
var _ Clock = (*clock.RealClock)(nil)
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, c, qps)
return newTokenBucketRateLimiterWithClock(limiter, c, qps)
}
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
// NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
// except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
// and uses a PassiveClock.
func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
}
func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
return &tokenBucketRateLimiter{
limiter: limiter,
clock: c,
qps: qps,
tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
clock: c,
}
}
func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.AllowN(t.clock.Now(), 1)
func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
return &tokenBucketPassiveRateLimiter{
limiter: limiter,
qps: qps,
clock: c,
}
}
func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
}
func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
return tbprl.qps
}
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
now := t.clock.Now()
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
func (tbrl *tokenBucketRateLimiter) Accept() {
now := tbrl.clock.Now()
tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (t *tokenBucketRateLimiter) Stop() {
}
func (t *tokenBucketRateLimiter) QPS() float32 {
return t.qps
}
func (t *tokenBucketRateLimiter) Wait(ctx context.Context) error {
return t.limiter.Wait(ctx)
func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
return tbrl.limiter.Wait(ctx)
}
type fakeAlwaysRateLimiter struct{}
@ -157,3 +182,11 @@ func (t *fakeNeverRateLimiter) QPS() float32 {
func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
return errors.New("can not be accept")
}
var (
_ RateLimiter = (*tokenBucketRateLimiter)(nil)
_ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
_ RateLimiter = (*fakeNeverRateLimiter)(nil)
)
var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)

View File

@ -190,6 +190,10 @@ func (fc *fakeClock) Sleep(d time.Duration) {
fc.now = fc.now.Add(d)
}
func (fc *fakeClock) Since(ts time.Time) time.Duration {
return time.Since(ts)
}
func TestRatePrecisionBug(t *testing.T) {
// golang.org/x/time/rate used to have bugs around precision and this
// proves that they don't recur (at least in the form we know about). This