Files
Milos Gajdos 3996413f46 Bump google storage module
Also bump the golangci version

Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
2024-10-26 18:19:46 +01:00

238 lines
6.9 KiB
Go

// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"math"
"sync"
"time"
)
// dynamicDelay dynamically calculates the delay at a fixed percentile, based on
// delay samples.
//
// dynamicDelay is goroutine-safe.
type dynamicDelay struct {
increaseFactor float64
decreaseFactor float64
minDelay time.Duration
maxDelay time.Duration
value time.Duration
// Guards the value
mu *sync.RWMutex
}
// validateDynamicDelayParams ensures,
// targetPercentile is a valid fraction (between 0 and 1).
// increaseRate is a positive number.
// minDelay is less than maxDelay.
func validateDynamicDelayParams(targetPercentile, increaseRate float64, minDelay, maxDelay time.Duration) error {
if targetPercentile < 0 || targetPercentile > 1 {
return fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}
return nil
}
// NewDynamicDelay returns a dynamicDelay.
//
// targetPercentile is the desired percentile to be computed. For example, a
// targetPercentile of 0.99 computes the delay at the 99th percentile. Must be
// in the range [0, 1].
//
// increaseRate (must be > 0) determines how many increase calls it takes for
// Value to double.
//
// initialDelay is the start value of the delay.
//
// decrease can never lower the delay past minDelay, increase can never raise
// the delay past maxDelay.
func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay {
if initialDelay < minDelay {
initialDelay = minDelay
}
if initialDelay > maxDelay {
initialDelay = maxDelay
}
// Compute increaseFactor and decreaseFactor such that:
// (increaseFactor ^ (1 - targetPercentile)) * (decreaseFactor ^ targetPercentile) = 1
increaseFactor := math.Exp(math.Log(2) / increaseRate)
if increaseFactor < 1.001 {
increaseFactor = 1.001
}
decreaseFactor := math.Exp(-math.Log(increaseFactor) * (1 - targetPercentile) / targetPercentile)
if decreaseFactor > 0.9999 {
decreaseFactor = 0.9999
}
return &dynamicDelay{
increaseFactor: increaseFactor,
decreaseFactor: decreaseFactor,
minDelay: minDelay,
maxDelay: maxDelay,
value: initialDelay,
mu: &sync.RWMutex{},
}
}
func (d *dynamicDelay) unsafeIncrease() {
v := time.Duration(float64(d.value) * d.increaseFactor)
if v > d.maxDelay {
d.value = d.maxDelay
} else {
d.value = v
}
}
// increase notes that the operation took longer than the delay returned by Value.
func (d *dynamicDelay) increase() {
d.mu.Lock()
defer d.mu.Unlock()
d.unsafeIncrease()
}
func (d *dynamicDelay) unsafeDecrease() {
v := time.Duration(float64(d.value) * d.decreaseFactor)
if v < d.minDelay {
d.value = d.minDelay
} else {
d.value = v
}
}
// decrease notes that the operation completed before the delay returned by getValue.
func (d *dynamicDelay) decrease() {
d.mu.Lock()
defer d.mu.Unlock()
d.unsafeDecrease()
}
// update updates the delay value depending on the specified latency.
func (d *dynamicDelay) update(latency time.Duration) {
d.mu.Lock()
defer d.mu.Unlock()
if latency > d.value {
d.unsafeIncrease()
} else {
d.unsafeDecrease()
}
}
// getValue returns the desired delay to wait before retry the operation.
func (d *dynamicDelay) getValue() time.Duration {
d.mu.RLock()
defer d.mu.RUnlock()
return d.value
}
// printDelay prints the state of delay, helpful in debugging.
func (d *dynamicDelay) printDelay() {
d.mu.RLock()
defer d.mu.RUnlock()
fmt.Println("IncreaseFactor: ", d.increaseFactor)
fmt.Println("DecreaseFactor: ", d.decreaseFactor)
fmt.Println("MinDelay: ", d.minDelay)
fmt.Println("MaxDelay: ", d.maxDelay)
fmt.Println("Value: ", d.value)
}
// bucketDelayManager wraps dynamicDelay to provide bucket-specific delays.
type bucketDelayManager struct {
targetPercentile float64
increaseRate float64
initialDelay time.Duration
minDelay time.Duration
maxDelay time.Duration
// delays maps bucket names to their dynamic delay instance.
delays map[string]*dynamicDelay
// mu guards delays.
mu *sync.RWMutex
}
// newBucketDelayManager returns a new bucketDelayManager instance.
func newBucketDelayManager(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelayManager, error) {
err := validateDynamicDelayParams(targetPercentile, increaseRate, minDelay, maxDelay)
if err != nil {
return nil, err
}
return &bucketDelayManager{
targetPercentile: targetPercentile,
increaseRate: increaseRate,
initialDelay: initialDelay,
minDelay: minDelay,
maxDelay: maxDelay,
delays: make(map[string]*dynamicDelay),
mu: &sync.RWMutex{},
}, nil
}
// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay
// exists for the bucket, a new one is created with the configured parameters.
func (b *bucketDelayManager) getDelay(bucketName string) *dynamicDelay {
b.mu.RLock()
delay, ok := b.delays[bucketName]
b.mu.RUnlock()
if !ok {
b.mu.Lock()
defer b.mu.Unlock()
// Check again, as someone might create b/w the execution of mu.RUnlock() and mu.Lock().
delay, ok = b.delays[bucketName]
if !ok {
// Create a new dynamicDelay for the bucket if it doesn't exist
delay = newDynamicDelay(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay)
b.delays[bucketName] = delay
}
}
return delay
}
// increase notes that the operation took longer than the delay for the given bucket.
func (b *bucketDelayManager) increase(bucketName string) {
b.getDelay(bucketName).increase()
}
// decrease notes that the operation completed before the delay for the given bucket.
func (b *bucketDelayManager) decrease(bucketName string) {
b.getDelay(bucketName).decrease()
}
// update updates the delay value for the bucket depending on the specified latency.
func (b *bucketDelayManager) update(bucketName string, latency time.Duration) {
b.getDelay(bucketName).update(latency)
}
// getValue returns the desired delay to wait before retrying the operation for the given bucket.
func (b *bucketDelayManager) getValue(bucketName string) time.Duration {
return b.getDelay(bucketName).getValue()
}