Allow switching rate limiter inside RateLimitedQueue

This commit is contained in:
gmarek
2016-07-13 10:40:22 +02:00
parent c94976570f
commit f6b1c316e9
9 changed files with 98 additions and 20 deletions

View File

@@ -148,8 +148,9 @@ func (q *UniqueQueue) Clear() {
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter flowcontrol.RateLimiter
queue UniqueQueue
limiterLock sync.Mutex
limiter flowcontrol.RateLimiter
}
// Creates new queue which will use given RateLimiter to oversee execution.
@@ -173,6 +174,8 @@ type ActionFunc func(TimedValue) (bool, time.Duration)
// time to execute the next item in the queue.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
for ok {
// rate limit the queue checking
if !q.limiter.TryAccept() {
@@ -216,3 +219,35 @@ func (q *RateLimitedTimedQueue) Remove(value string) bool {
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}
// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.
func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
if q.limiter.QPS() == newQPS {
return
}
var newLimiter flowcontrol.RateLimiter
if newQPS <= 0 {
newLimiter = flowcontrol.NewFakeNeverRateLimiter()
} else {
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst)
}
// If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
// TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:
// - saturation (percentage of used tokens)
// - number of used tokens
// - number of available tokens
// - something else
for q.limiter.Saturation() > newLimiter.Saturation() {
// Check if we're not using fake limiter
previousSaturation := newLimiter.Saturation()
newLimiter.TryAccept()
// It's a fake limiter
if newLimiter.Saturation() == previousSaturation {
break
}
}
q.limiter.Stop()
q.limiter = newLimiter
}