// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "fmt" "math" "sync" "time" ) // dynamicDelay dynamically calculates the delay at a fixed percentile, based on // delay samples. // // dynamicDelay is goroutine-safe. type dynamicDelay struct { increaseFactor float64 decreaseFactor float64 minDelay time.Duration maxDelay time.Duration value time.Duration // Guards the value mu *sync.RWMutex } // validateDynamicDelayParams ensures, // targetPercentile is a valid fraction (between 0 and 1). // increaseRate is a positive number. // minDelay is less than maxDelay. func validateDynamicDelayParams(targetPercentile, increaseRate float64, minDelay, maxDelay time.Duration) error { if targetPercentile < 0 || targetPercentile > 1 { return fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) } if increaseRate <= 0 { return fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) } if minDelay >= maxDelay { return fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) } return nil } // NewDynamicDelay returns a dynamicDelay. // // targetPercentile is the desired percentile to be computed. For example, a // targetPercentile of 0.99 computes the delay at the 99th percentile. Must be // in the range [0, 1]. // // increaseRate (must be > 0) determines how many increase calls it takes for // Value to double. // // initialDelay is the start value of the delay. // // decrease can never lower the delay past minDelay, increase can never raise // the delay past maxDelay. func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay { if initialDelay < minDelay { initialDelay = minDelay } if initialDelay > maxDelay { initialDelay = maxDelay } // Compute increaseFactor and decreaseFactor such that: // (increaseFactor ^ (1 - targetPercentile)) * (decreaseFactor ^ targetPercentile) = 1 increaseFactor := math.Exp(math.Log(2) / increaseRate) if increaseFactor < 1.001 { increaseFactor = 1.001 } decreaseFactor := math.Exp(-math.Log(increaseFactor) * (1 - targetPercentile) / targetPercentile) if decreaseFactor > 0.9999 { decreaseFactor = 0.9999 } return &dynamicDelay{ increaseFactor: increaseFactor, decreaseFactor: decreaseFactor, minDelay: minDelay, maxDelay: maxDelay, value: initialDelay, mu: &sync.RWMutex{}, } } func (d *dynamicDelay) unsafeIncrease() { v := time.Duration(float64(d.value) * d.increaseFactor) if v > d.maxDelay { d.value = d.maxDelay } else { d.value = v } } // increase notes that the operation took longer than the delay returned by Value. func (d *dynamicDelay) increase() { d.mu.Lock() defer d.mu.Unlock() d.unsafeIncrease() } func (d *dynamicDelay) unsafeDecrease() { v := time.Duration(float64(d.value) * d.decreaseFactor) if v < d.minDelay { d.value = d.minDelay } else { d.value = v } } // decrease notes that the operation completed before the delay returned by getValue. func (d *dynamicDelay) decrease() { d.mu.Lock() defer d.mu.Unlock() d.unsafeDecrease() } // update updates the delay value depending on the specified latency. func (d *dynamicDelay) update(latency time.Duration) { d.mu.Lock() defer d.mu.Unlock() if latency > d.value { d.unsafeIncrease() } else { d.unsafeDecrease() } } // getValue returns the desired delay to wait before retry the operation. func (d *dynamicDelay) getValue() time.Duration { d.mu.RLock() defer d.mu.RUnlock() return d.value } // printDelay prints the state of delay, helpful in debugging. func (d *dynamicDelay) printDelay() { d.mu.RLock() defer d.mu.RUnlock() fmt.Println("IncreaseFactor: ", d.increaseFactor) fmt.Println("DecreaseFactor: ", d.decreaseFactor) fmt.Println("MinDelay: ", d.minDelay) fmt.Println("MaxDelay: ", d.maxDelay) fmt.Println("Value: ", d.value) } // bucketDelayManager wraps dynamicDelay to provide bucket-specific delays. type bucketDelayManager struct { targetPercentile float64 increaseRate float64 initialDelay time.Duration minDelay time.Duration maxDelay time.Duration // delays maps bucket names to their dynamic delay instance. delays map[string]*dynamicDelay // mu guards delays. mu *sync.RWMutex } // newBucketDelayManager returns a new bucketDelayManager instance. func newBucketDelayManager(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelayManager, error) { err := validateDynamicDelayParams(targetPercentile, increaseRate, minDelay, maxDelay) if err != nil { return nil, err } return &bucketDelayManager{ targetPercentile: targetPercentile, increaseRate: increaseRate, initialDelay: initialDelay, minDelay: minDelay, maxDelay: maxDelay, delays: make(map[string]*dynamicDelay), mu: &sync.RWMutex{}, }, nil } // getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay // exists for the bucket, a new one is created with the configured parameters. func (b *bucketDelayManager) getDelay(bucketName string) *dynamicDelay { b.mu.RLock() delay, ok := b.delays[bucketName] b.mu.RUnlock() if !ok { b.mu.Lock() defer b.mu.Unlock() // Check again, as someone might create b/w the execution of mu.RUnlock() and mu.Lock(). delay, ok = b.delays[bucketName] if !ok { // Create a new dynamicDelay for the bucket if it doesn't exist delay = newDynamicDelay(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay) b.delays[bucketName] = delay } } return delay } // increase notes that the operation took longer than the delay for the given bucket. func (b *bucketDelayManager) increase(bucketName string) { b.getDelay(bucketName).increase() } // decrease notes that the operation completed before the delay for the given bucket. func (b *bucketDelayManager) decrease(bucketName string) { b.getDelay(bucketName).decrease() } // update updates the delay value for the bucket depending on the specified latency. func (b *bucketDelayManager) update(bucketName string, latency time.Duration) { b.getDelay(bucketName).update(latency) } // getValue returns the desired delay to wait before retrying the operation for the given bucket. func (b *bucketDelayManager) getValue(bucketName string) time.Duration { return b.getDelay(bucketName).getValue() }