mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #116476 from smarterclayton/context_wait_2
wait: Split the wait package up into individual files to make refactors easier
This commit is contained in:
commit
087868a436
339
staging/src/k8s.io/apimachinery/pkg/util/wait/backoff.go
Normal file
339
staging/src/k8s.io/apimachinery/pkg/util/wait/backoff.go
Normal file
@ -0,0 +1,339 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2023 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package wait
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/utils/clock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Backoff holds parameters applied to a Backoff function.
|
||||||
|
type Backoff struct {
|
||||||
|
// The initial duration.
|
||||||
|
Duration time.Duration
|
||||||
|
// Duration is multiplied by factor each iteration, if factor is not zero
|
||||||
|
// and the limits imposed by Steps and Cap have not been reached.
|
||||||
|
// Should not be negative.
|
||||||
|
// The jitter does not contribute to the updates to the duration parameter.
|
||||||
|
Factor float64
|
||||||
|
// The sleep at each iteration is the duration plus an additional
|
||||||
|
// amount chosen uniformly at random from the interval between
|
||||||
|
// zero and `jitter*duration`.
|
||||||
|
Jitter float64
|
||||||
|
// The remaining number of iterations in which the duration
|
||||||
|
// parameter may change (but progress can be stopped earlier by
|
||||||
|
// hitting the cap). If not positive, the duration is not
|
||||||
|
// changed. Used for exponential backoff in combination with
|
||||||
|
// Factor and Cap.
|
||||||
|
Steps int
|
||||||
|
// A limit on revised values of the duration parameter. If a
|
||||||
|
// multiplication by the factor parameter would make the duration
|
||||||
|
// exceed the cap then the duration is set to the cap and the
|
||||||
|
// steps parameter is set to zero.
|
||||||
|
Cap time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step (1) returns an amount of time to sleep determined by the
|
||||||
|
// original Duration and Jitter and (2) mutates the provided Backoff
|
||||||
|
// to update its Steps and Duration.
|
||||||
|
func (b *Backoff) Step() time.Duration {
|
||||||
|
if b.Steps < 1 {
|
||||||
|
if b.Jitter > 0 {
|
||||||
|
return Jitter(b.Duration, b.Jitter)
|
||||||
|
}
|
||||||
|
return b.Duration
|
||||||
|
}
|
||||||
|
b.Steps--
|
||||||
|
|
||||||
|
duration := b.Duration
|
||||||
|
|
||||||
|
// calculate the next step
|
||||||
|
if b.Factor != 0 {
|
||||||
|
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
|
||||||
|
if b.Cap > 0 && b.Duration > b.Cap {
|
||||||
|
b.Duration = b.Cap
|
||||||
|
b.Steps = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Jitter > 0 {
|
||||||
|
duration = Jitter(duration, b.Jitter)
|
||||||
|
}
|
||||||
|
return duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Until loops until stop channel is closed, running f every period.
|
||||||
|
//
|
||||||
|
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
|
||||||
|
// with sliding = true (which means the timer for period starts after the f
|
||||||
|
// completes).
|
||||||
|
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
|
||||||
|
JitterUntil(f, period, 0.0, true, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UntilWithContext loops until context is done, running f every period.
|
||||||
|
//
|
||||||
|
// UntilWithContext is syntactic sugar on top of JitterUntilWithContext
|
||||||
|
// with zero jitter factor and with sliding = true (which means the timer
|
||||||
|
// for period starts after the f completes).
|
||||||
|
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
|
||||||
|
JitterUntilWithContext(ctx, f, period, 0.0, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NonSlidingUntil loops until stop channel is closed, running f every
|
||||||
|
// period.
|
||||||
|
//
|
||||||
|
// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
|
||||||
|
// factor, with sliding = false (meaning the timer for period starts at the same
|
||||||
|
// time as the function starts).
|
||||||
|
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
|
||||||
|
JitterUntil(f, period, 0.0, false, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NonSlidingUntilWithContext loops until context is done, running f every
|
||||||
|
// period.
|
||||||
|
//
|
||||||
|
// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
|
||||||
|
// with zero jitter factor, with sliding = false (meaning the timer for period
|
||||||
|
// starts at the same time as the function starts).
|
||||||
|
func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
|
||||||
|
JitterUntilWithContext(ctx, f, period, 0.0, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// JitterUntil loops until stop channel is closed, running f every period.
|
||||||
|
//
|
||||||
|
// If jitterFactor is positive, the period is jittered before every run of f.
|
||||||
|
// If jitterFactor is not positive, the period is unchanged and not jittered.
|
||||||
|
//
|
||||||
|
// If sliding is true, the period is computed after f runs. If it is false then
|
||||||
|
// period includes the runtime for f.
|
||||||
|
//
|
||||||
|
// Close stopCh to stop. f may not be invoked if stop channel is already
|
||||||
|
// closed. Pass NeverStop to if you don't want it stop.
|
||||||
|
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
|
||||||
|
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
|
||||||
|
//
|
||||||
|
// If sliding is true, the period is computed after f runs. If it is false then
|
||||||
|
// period includes the runtime for f.
|
||||||
|
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
|
||||||
|
var t clock.Timer
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if !sliding {
|
||||||
|
t = backoff.Backoff()
|
||||||
|
}
|
||||||
|
|
||||||
|
func() {
|
||||||
|
defer runtime.HandleCrash()
|
||||||
|
f()
|
||||||
|
}()
|
||||||
|
|
||||||
|
if sliding {
|
||||||
|
t = backoff.Backoff()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: b/c there is no priority selection in golang
|
||||||
|
// it is possible for this to race, meaning we could
|
||||||
|
// trigger t.C and stopCh, and t.C select falls through.
|
||||||
|
// In order to mitigate we re-check stopCh at the beginning
|
||||||
|
// of every loop to prevent extra executions of f().
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
if !t.Stop() {
|
||||||
|
<-t.C()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-t.C():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// JitterUntilWithContext loops until context is done, running f every period.
|
||||||
|
//
|
||||||
|
// If jitterFactor is positive, the period is jittered before every run of f.
|
||||||
|
// If jitterFactor is not positive, the period is unchanged and not jittered.
|
||||||
|
//
|
||||||
|
// If sliding is true, the period is computed after f runs. If it is false then
|
||||||
|
// period includes the runtime for f.
|
||||||
|
//
|
||||||
|
// Cancel context to stop. f may not be invoked if context is already expired.
|
||||||
|
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
|
||||||
|
JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
|
||||||
|
// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
|
||||||
|
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
|
||||||
|
// undetermined behavior.
|
||||||
|
// The BackoffManager is supposed to be called in a single-threaded environment.
|
||||||
|
type BackoffManager interface {
|
||||||
|
Backoff() clock.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
type exponentialBackoffManagerImpl struct {
|
||||||
|
backoff *Backoff
|
||||||
|
backoffTimer clock.Timer
|
||||||
|
lastBackoffStart time.Time
|
||||||
|
initialBackoff time.Duration
|
||||||
|
backoffResetDuration time.Duration
|
||||||
|
clock clock.Clock
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
|
||||||
|
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
|
||||||
|
// This backoff manager is used to reduce load during upstream unhealthiness.
|
||||||
|
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
|
||||||
|
return &exponentialBackoffManagerImpl{
|
||||||
|
backoff: &Backoff{
|
||||||
|
Duration: initBackoff,
|
||||||
|
Factor: backoffFactor,
|
||||||
|
Jitter: jitter,
|
||||||
|
|
||||||
|
// the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
|
||||||
|
// what we ideally need here, we set it to max int and assume we will never use up the steps
|
||||||
|
Steps: math.MaxInt32,
|
||||||
|
Cap: maxBackoff,
|
||||||
|
},
|
||||||
|
backoffTimer: nil,
|
||||||
|
initialBackoff: initBackoff,
|
||||||
|
lastBackoffStart: c.Now(),
|
||||||
|
backoffResetDuration: resetDuration,
|
||||||
|
clock: c,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
|
||||||
|
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
|
||||||
|
b.backoff.Steps = math.MaxInt32
|
||||||
|
b.backoff.Duration = b.initialBackoff
|
||||||
|
}
|
||||||
|
b.lastBackoffStart = b.clock.Now()
|
||||||
|
return b.backoff.Step()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
|
||||||
|
// The returned timer must be drained before calling Backoff() the second time
|
||||||
|
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
|
||||||
|
if b.backoffTimer == nil {
|
||||||
|
b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
|
||||||
|
} else {
|
||||||
|
b.backoffTimer.Reset(b.getNextBackoff())
|
||||||
|
}
|
||||||
|
return b.backoffTimer
|
||||||
|
}
|
||||||
|
|
||||||
|
type jitteredBackoffManagerImpl struct {
|
||||||
|
clock clock.Clock
|
||||||
|
duration time.Duration
|
||||||
|
jitter float64
|
||||||
|
backoffTimer clock.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
|
||||||
|
// is negative, backoff will not be jittered.
|
||||||
|
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
|
||||||
|
return &jitteredBackoffManagerImpl{
|
||||||
|
clock: c,
|
||||||
|
duration: duration,
|
||||||
|
jitter: jitter,
|
||||||
|
backoffTimer: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
|
||||||
|
jitteredPeriod := j.duration
|
||||||
|
if j.jitter > 0.0 {
|
||||||
|
jitteredPeriod = Jitter(j.duration, j.jitter)
|
||||||
|
}
|
||||||
|
return jitteredPeriod
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
|
||||||
|
// The returned timer must be drained before calling Backoff() the second time
|
||||||
|
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
|
||||||
|
backoff := j.getNextBackoff()
|
||||||
|
if j.backoffTimer == nil {
|
||||||
|
j.backoffTimer = j.clock.NewTimer(backoff)
|
||||||
|
} else {
|
||||||
|
j.backoffTimer.Reset(backoff)
|
||||||
|
}
|
||||||
|
return j.backoffTimer
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExponentialBackoff repeats a condition check with exponential backoff.
|
||||||
|
//
|
||||||
|
// It repeatedly checks the condition and then sleeps, using `backoff.Step()`
|
||||||
|
// to determine the length of the sleep and adjust Duration and Steps.
|
||||||
|
// Stops and returns as soon as:
|
||||||
|
// 1. the condition check returns true or an error,
|
||||||
|
// 2. `backoff.Steps` checks of the condition have been done, or
|
||||||
|
// 3. a sleep truncated by the cap on duration has been completed.
|
||||||
|
// In case (1) the returned error is what the condition function returned.
|
||||||
|
// In all other cases, ErrWaitTimeout is returned.
|
||||||
|
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
||||||
|
for backoff.Steps > 0 {
|
||||||
|
if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if backoff.Steps == 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(backoff.Step())
|
||||||
|
}
|
||||||
|
return ErrWaitTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never
|
||||||
|
// exceeds the deadline specified by the request context.
|
||||||
|
func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
|
||||||
|
for backoff.Steps > 0 {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok, err := runConditionWithCrashProtectionWithContext(ctx, condition); err != nil || ok {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if backoff.Steps == 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
waitBeforeRetry := backoff.Step()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(waitBeforeRetry):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ErrWaitTimeout
|
||||||
|
}
|
22
staging/src/k8s.io/apimachinery/pkg/util/wait/error.go
Normal file
22
staging/src/k8s.io/apimachinery/pkg/util/wait/error.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2023 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package wait
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
// ErrWaitTimeout is returned when the condition exited without success.
|
||||||
|
var ErrWaitTimeout = errors.New("timed out waiting for the condition")
|
235
staging/src/k8s.io/apimachinery/pkg/util/wait/poll.go
Normal file
235
staging/src/k8s.io/apimachinery/pkg/util/wait/poll.go
Normal file
@ -0,0 +1,235 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2023 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package wait
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Poll tries a condition func until it returns true, an error, or the timeout
|
||||||
|
// is reached.
|
||||||
|
//
|
||||||
|
// Poll always waits the interval before the run of 'condition'.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
//
|
||||||
|
// If you want to Poll something forever, see PollInfinite.
|
||||||
|
func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
|
||||||
|
return PollWithContext(context.Background(), interval, timeout, condition.WithContext())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollWithContext tries a condition func until it returns true, an error,
|
||||||
|
// or when the context expires or the timeout is reached, whichever
|
||||||
|
// happens first.
|
||||||
|
//
|
||||||
|
// PollWithContext always waits the interval before the run of 'condition'.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
//
|
||||||
|
// If you want to Poll something forever, see PollInfinite.
|
||||||
|
func PollWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
|
||||||
|
return poll(ctx, false, poller(interval, timeout), condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollUntil tries a condition func until it returns true, an error or stopCh is
|
||||||
|
// closed.
|
||||||
|
//
|
||||||
|
// PollUntil always waits interval before the first run of 'condition'.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
||||||
|
return PollUntilWithContext(ContextForChannel(stopCh), interval, condition.WithContext())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollUntilWithContext tries a condition func until it returns true,
|
||||||
|
// an error or the specified context is cancelled or expired.
|
||||||
|
//
|
||||||
|
// PollUntilWithContext always waits interval before the first run of 'condition'.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
func PollUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
||||||
|
return poll(ctx, false, poller(interval, 0), condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollInfinite tries a condition func until it returns true or an error
|
||||||
|
//
|
||||||
|
// PollInfinite always waits the interval before the run of 'condition'.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
func PollInfinite(interval time.Duration, condition ConditionFunc) error {
|
||||||
|
return PollInfiniteWithContext(context.Background(), interval, condition.WithContext())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollInfiniteWithContext tries a condition func until it returns true or an error
|
||||||
|
//
|
||||||
|
// PollInfiniteWithContext always waits the interval before the run of 'condition'.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
func PollInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
||||||
|
return poll(ctx, false, poller(interval, 0), condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollImmediate tries a condition func until it returns true, an error, or the timeout
|
||||||
|
// is reached.
|
||||||
|
//
|
||||||
|
// PollImmediate always checks 'condition' before waiting for the interval. 'condition'
|
||||||
|
// will always be invoked at least once.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
//
|
||||||
|
// If you want to immediately Poll something forever, see PollImmediateInfinite.
|
||||||
|
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
|
||||||
|
return PollImmediateWithContext(context.Background(), interval, timeout, condition.WithContext())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollImmediateWithContext tries a condition func until it returns true, an error,
|
||||||
|
// or the timeout is reached or the specified context expires, whichever happens first.
|
||||||
|
//
|
||||||
|
// PollImmediateWithContext always checks 'condition' before waiting for the interval.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
//
|
||||||
|
// If you want to immediately Poll something forever, see PollImmediateInfinite.
|
||||||
|
func PollImmediateWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
|
||||||
|
return poll(ctx, true, poller(interval, timeout), condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
|
||||||
|
//
|
||||||
|
// PollImmediateUntil runs the 'condition' before waiting for the interval.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
||||||
|
return PollImmediateUntilWithContext(ContextForChannel(stopCh), interval, condition.WithContext())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollImmediateUntilWithContext tries a condition func until it returns true,
|
||||||
|
// an error or the specified context is cancelled or expired.
|
||||||
|
//
|
||||||
|
// PollImmediateUntilWithContext runs the 'condition' before waiting for the interval.
|
||||||
|
// 'condition' will always be invoked at least once.
|
||||||
|
func PollImmediateUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
||||||
|
return poll(ctx, true, poller(interval, 0), condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollImmediateInfinite tries a condition func until it returns true or an error
|
||||||
|
//
|
||||||
|
// PollImmediateInfinite runs the 'condition' before waiting for the interval.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error {
|
||||||
|
return PollImmediateInfiniteWithContext(context.Background(), interval, condition.WithContext())
|
||||||
|
}
|
||||||
|
|
||||||
|
// PollImmediateInfiniteWithContext tries a condition func until it returns true
|
||||||
|
// or an error or the specified context gets cancelled or expired.
|
||||||
|
//
|
||||||
|
// PollImmediateInfiniteWithContext runs the 'condition' before waiting for the interval.
|
||||||
|
//
|
||||||
|
// Some intervals may be missed if the condition takes too long or the time
|
||||||
|
// window is too short.
|
||||||
|
func PollImmediateInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
||||||
|
return poll(ctx, true, poller(interval, 0), condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internally used, each of the public 'Poll*' function defined in this
|
||||||
|
// package should invoke this internal function with appropriate parameters.
|
||||||
|
// ctx: the context specified by the caller, for infinite polling pass
|
||||||
|
// a context that never gets cancelled or expired.
|
||||||
|
// immediate: if true, the 'condition' will be invoked before waiting for the interval,
|
||||||
|
// in this case 'condition' will always be invoked at least once.
|
||||||
|
// wait: user specified WaitFunc function that controls at what interval the condition
|
||||||
|
// function should be invoked periodically and whether it is bound by a timeout.
|
||||||
|
// condition: user specified ConditionWithContextFunc function.
|
||||||
|
func poll(ctx context.Context, immediate bool, wait waitWithContextFunc, condition ConditionWithContextFunc) error {
|
||||||
|
if immediate {
|
||||||
|
done, err := runConditionWithCrashProtectionWithContext(ctx, condition)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// returning ctx.Err() will break backward compatibility
|
||||||
|
return ErrWaitTimeout
|
||||||
|
default:
|
||||||
|
return waitForWithContext(ctx, wait, condition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// poller returns a WaitFunc that will send to the channel every interval until
|
||||||
|
// timeout has elapsed and then closes the channel.
|
||||||
|
//
|
||||||
|
// Over very short intervals you may receive no ticks before the channel is
|
||||||
|
// closed. A timeout of 0 is interpreted as an infinity, and in such a case
|
||||||
|
// it would be the caller's responsibility to close the done channel.
|
||||||
|
// Failure to do so would result in a leaked goroutine.
|
||||||
|
//
|
||||||
|
// Output ticks are not buffered. If the channel is not ready to receive an
|
||||||
|
// item, the tick is skipped.
|
||||||
|
func poller(interval, timeout time.Duration) waitWithContextFunc {
|
||||||
|
return waitWithContextFunc(func(ctx context.Context) <-chan struct{} {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(ch)
|
||||||
|
|
||||||
|
tick := time.NewTicker(interval)
|
||||||
|
defer tick.Stop()
|
||||||
|
|
||||||
|
var after <-chan time.Time
|
||||||
|
if timeout != 0 {
|
||||||
|
// time.After is more convenient, but it
|
||||||
|
// potentially leaves timers around much longer
|
||||||
|
// than necessary if we exit early.
|
||||||
|
timer := time.NewTimer(timeout)
|
||||||
|
after = timer.C
|
||||||
|
defer timer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-tick.C:
|
||||||
|
// If the consumer isn't ready for this signal drop it and
|
||||||
|
// check the other channels.
|
||||||
|
select {
|
||||||
|
case ch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
case <-after:
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch
|
||||||
|
})
|
||||||
|
}
|
@ -18,14 +18,11 @@ package wait
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/utils/clock"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// For any test of the style:
|
// For any test of the style:
|
||||||
@ -83,113 +80,6 @@ func Forever(f func(), period time.Duration) {
|
|||||||
Until(f, period, NeverStop)
|
Until(f, period, NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Until loops until stop channel is closed, running f every period.
|
|
||||||
//
|
|
||||||
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
|
|
||||||
// with sliding = true (which means the timer for period starts after the f
|
|
||||||
// completes).
|
|
||||||
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
|
|
||||||
JitterUntil(f, period, 0.0, true, stopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UntilWithContext loops until context is done, running f every period.
|
|
||||||
//
|
|
||||||
// UntilWithContext is syntactic sugar on top of JitterUntilWithContext
|
|
||||||
// with zero jitter factor and with sliding = true (which means the timer
|
|
||||||
// for period starts after the f completes).
|
|
||||||
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
|
|
||||||
JitterUntilWithContext(ctx, f, period, 0.0, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NonSlidingUntil loops until stop channel is closed, running f every
|
|
||||||
// period.
|
|
||||||
//
|
|
||||||
// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
|
|
||||||
// factor, with sliding = false (meaning the timer for period starts at the same
|
|
||||||
// time as the function starts).
|
|
||||||
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
|
|
||||||
JitterUntil(f, period, 0.0, false, stopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NonSlidingUntilWithContext loops until context is done, running f every
|
|
||||||
// period.
|
|
||||||
//
|
|
||||||
// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
|
|
||||||
// with zero jitter factor, with sliding = false (meaning the timer for period
|
|
||||||
// starts at the same time as the function starts).
|
|
||||||
func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
|
|
||||||
JitterUntilWithContext(ctx, f, period, 0.0, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// JitterUntil loops until stop channel is closed, running f every period.
|
|
||||||
//
|
|
||||||
// If jitterFactor is positive, the period is jittered before every run of f.
|
|
||||||
// If jitterFactor is not positive, the period is unchanged and not jittered.
|
|
||||||
//
|
|
||||||
// If sliding is true, the period is computed after f runs. If it is false then
|
|
||||||
// period includes the runtime for f.
|
|
||||||
//
|
|
||||||
// Close stopCh to stop. f may not be invoked if stop channel is already
|
|
||||||
// closed. Pass NeverStop to if you don't want it stop.
|
|
||||||
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
|
|
||||||
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
|
|
||||||
//
|
|
||||||
// If sliding is true, the period is computed after f runs. If it is false then
|
|
||||||
// period includes the runtime for f.
|
|
||||||
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
|
|
||||||
var t clock.Timer
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-stopCh:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if !sliding {
|
|
||||||
t = backoff.Backoff()
|
|
||||||
}
|
|
||||||
|
|
||||||
func() {
|
|
||||||
defer runtime.HandleCrash()
|
|
||||||
f()
|
|
||||||
}()
|
|
||||||
|
|
||||||
if sliding {
|
|
||||||
t = backoff.Backoff()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: b/c there is no priority selection in golang
|
|
||||||
// it is possible for this to race, meaning we could
|
|
||||||
// trigger t.C and stopCh, and t.C select falls through.
|
|
||||||
// In order to mitigate we re-check stopCh at the beginning
|
|
||||||
// of every loop to prevent extra executions of f().
|
|
||||||
select {
|
|
||||||
case <-stopCh:
|
|
||||||
if !t.Stop() {
|
|
||||||
<-t.C()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case <-t.C():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// JitterUntilWithContext loops until context is done, running f every period.
|
|
||||||
//
|
|
||||||
// If jitterFactor is positive, the period is jittered before every run of f.
|
|
||||||
// If jitterFactor is not positive, the period is unchanged and not jittered.
|
|
||||||
//
|
|
||||||
// If sliding is true, the period is computed after f runs. If it is false then
|
|
||||||
// period includes the runtime for f.
|
|
||||||
//
|
|
||||||
// Cancel context to stop. f may not be invoked if context is already expired.
|
|
||||||
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
|
|
||||||
JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
||||||
// duration.
|
// duration.
|
||||||
//
|
//
|
||||||
@ -203,9 +93,6 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration {
|
|||||||
return wait
|
return wait
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrWaitTimeout is returned when the condition exited without success.
|
|
||||||
var ErrWaitTimeout = errors.New("timed out waiting for the condition")
|
|
||||||
|
|
||||||
// ConditionFunc returns true if the condition is satisfied, or an error
|
// ConditionFunc returns true if the condition is satisfied, or an error
|
||||||
// if the loop should be aborted.
|
// if the loop should be aborted.
|
||||||
type ConditionFunc func() (done bool, err error)
|
type ConditionFunc func() (done bool, err error)
|
||||||
@ -262,345 +149,6 @@ func runConditionWithCrashProtectionWithContext(ctx context.Context, condition C
|
|||||||
return condition(ctx)
|
return condition(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backoff holds parameters applied to a Backoff function.
|
|
||||||
type Backoff struct {
|
|
||||||
// The initial duration.
|
|
||||||
Duration time.Duration
|
|
||||||
// Duration is multiplied by factor each iteration, if factor is not zero
|
|
||||||
// and the limits imposed by Steps and Cap have not been reached.
|
|
||||||
// Should not be negative.
|
|
||||||
// The jitter does not contribute to the updates to the duration parameter.
|
|
||||||
Factor float64
|
|
||||||
// The sleep at each iteration is the duration plus an additional
|
|
||||||
// amount chosen uniformly at random from the interval between
|
|
||||||
// zero and `jitter*duration`.
|
|
||||||
Jitter float64
|
|
||||||
// The remaining number of iterations in which the duration
|
|
||||||
// parameter may change (but progress can be stopped earlier by
|
|
||||||
// hitting the cap). If not positive, the duration is not
|
|
||||||
// changed. Used for exponential backoff in combination with
|
|
||||||
// Factor and Cap.
|
|
||||||
Steps int
|
|
||||||
// A limit on revised values of the duration parameter. If a
|
|
||||||
// multiplication by the factor parameter would make the duration
|
|
||||||
// exceed the cap then the duration is set to the cap and the
|
|
||||||
// steps parameter is set to zero.
|
|
||||||
Cap time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step (1) returns an amount of time to sleep determined by the
|
|
||||||
// original Duration and Jitter and (2) mutates the provided Backoff
|
|
||||||
// to update its Steps and Duration.
|
|
||||||
func (b *Backoff) Step() time.Duration {
|
|
||||||
if b.Steps < 1 {
|
|
||||||
if b.Jitter > 0 {
|
|
||||||
return Jitter(b.Duration, b.Jitter)
|
|
||||||
}
|
|
||||||
return b.Duration
|
|
||||||
}
|
|
||||||
b.Steps--
|
|
||||||
|
|
||||||
duration := b.Duration
|
|
||||||
|
|
||||||
// calculate the next step
|
|
||||||
if b.Factor != 0 {
|
|
||||||
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
|
|
||||||
if b.Cap > 0 && b.Duration > b.Cap {
|
|
||||||
b.Duration = b.Cap
|
|
||||||
b.Steps = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if b.Jitter > 0 {
|
|
||||||
duration = Jitter(duration, b.Jitter)
|
|
||||||
}
|
|
||||||
return duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
|
|
||||||
// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
|
|
||||||
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
|
|
||||||
// undetermined behavior.
|
|
||||||
// The BackoffManager is supposed to be called in a single-threaded environment.
|
|
||||||
type BackoffManager interface {
|
|
||||||
Backoff() clock.Timer
|
|
||||||
}
|
|
||||||
|
|
||||||
type exponentialBackoffManagerImpl struct {
|
|
||||||
backoff *Backoff
|
|
||||||
backoffTimer clock.Timer
|
|
||||||
lastBackoffStart time.Time
|
|
||||||
initialBackoff time.Duration
|
|
||||||
backoffResetDuration time.Duration
|
|
||||||
clock clock.Clock
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
|
|
||||||
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
|
|
||||||
// This backoff manager is used to reduce load during upstream unhealthiness.
|
|
||||||
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
|
|
||||||
return &exponentialBackoffManagerImpl{
|
|
||||||
backoff: &Backoff{
|
|
||||||
Duration: initBackoff,
|
|
||||||
Factor: backoffFactor,
|
|
||||||
Jitter: jitter,
|
|
||||||
|
|
||||||
// the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
|
|
||||||
// what we ideally need here, we set it to max int and assume we will never use up the steps
|
|
||||||
Steps: math.MaxInt32,
|
|
||||||
Cap: maxBackoff,
|
|
||||||
},
|
|
||||||
backoffTimer: nil,
|
|
||||||
initialBackoff: initBackoff,
|
|
||||||
lastBackoffStart: c.Now(),
|
|
||||||
backoffResetDuration: resetDuration,
|
|
||||||
clock: c,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
|
|
||||||
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
|
|
||||||
b.backoff.Steps = math.MaxInt32
|
|
||||||
b.backoff.Duration = b.initialBackoff
|
|
||||||
}
|
|
||||||
b.lastBackoffStart = b.clock.Now()
|
|
||||||
return b.backoff.Step()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
|
|
||||||
// The returned timer must be drained before calling Backoff() the second time
|
|
||||||
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
|
|
||||||
if b.backoffTimer == nil {
|
|
||||||
b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
|
|
||||||
} else {
|
|
||||||
b.backoffTimer.Reset(b.getNextBackoff())
|
|
||||||
}
|
|
||||||
return b.backoffTimer
|
|
||||||
}
|
|
||||||
|
|
||||||
type jitteredBackoffManagerImpl struct {
|
|
||||||
clock clock.Clock
|
|
||||||
duration time.Duration
|
|
||||||
jitter float64
|
|
||||||
backoffTimer clock.Timer
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
|
|
||||||
// is negative, backoff will not be jittered.
|
|
||||||
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
|
|
||||||
return &jitteredBackoffManagerImpl{
|
|
||||||
clock: c,
|
|
||||||
duration: duration,
|
|
||||||
jitter: jitter,
|
|
||||||
backoffTimer: nil,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
|
|
||||||
jitteredPeriod := j.duration
|
|
||||||
if j.jitter > 0.0 {
|
|
||||||
jitteredPeriod = Jitter(j.duration, j.jitter)
|
|
||||||
}
|
|
||||||
return jitteredPeriod
|
|
||||||
}
|
|
||||||
|
|
||||||
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
|
|
||||||
// The returned timer must be drained before calling Backoff() the second time
|
|
||||||
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
|
|
||||||
backoff := j.getNextBackoff()
|
|
||||||
if j.backoffTimer == nil {
|
|
||||||
j.backoffTimer = j.clock.NewTimer(backoff)
|
|
||||||
} else {
|
|
||||||
j.backoffTimer.Reset(backoff)
|
|
||||||
}
|
|
||||||
return j.backoffTimer
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExponentialBackoff repeats a condition check with exponential backoff.
|
|
||||||
//
|
|
||||||
// It repeatedly checks the condition and then sleeps, using `backoff.Step()`
|
|
||||||
// to determine the length of the sleep and adjust Duration and Steps.
|
|
||||||
// Stops and returns as soon as:
|
|
||||||
// 1. the condition check returns true or an error,
|
|
||||||
// 2. `backoff.Steps` checks of the condition have been done, or
|
|
||||||
// 3. a sleep truncated by the cap on duration has been completed.
|
|
||||||
// In case (1) the returned error is what the condition function returned.
|
|
||||||
// In all other cases, ErrWaitTimeout is returned.
|
|
||||||
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
|
||||||
for backoff.Steps > 0 {
|
|
||||||
if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if backoff.Steps == 1 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(backoff.Step())
|
|
||||||
}
|
|
||||||
return ErrWaitTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// Poll tries a condition func until it returns true, an error, or the timeout
|
|
||||||
// is reached.
|
|
||||||
//
|
|
||||||
// Poll always waits the interval before the run of 'condition'.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
//
|
|
||||||
// If you want to Poll something forever, see PollInfinite.
|
|
||||||
func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
|
|
||||||
return PollWithContext(context.Background(), interval, timeout, condition.WithContext())
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollWithContext tries a condition func until it returns true, an error,
|
|
||||||
// or when the context expires or the timeout is reached, whichever
|
|
||||||
// happens first.
|
|
||||||
//
|
|
||||||
// PollWithContext always waits the interval before the run of 'condition'.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
//
|
|
||||||
// If you want to Poll something forever, see PollInfinite.
|
|
||||||
func PollWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
|
|
||||||
return poll(ctx, false, poller(interval, timeout), condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollUntil tries a condition func until it returns true, an error or stopCh is
|
|
||||||
// closed.
|
|
||||||
//
|
|
||||||
// PollUntil always waits interval before the first run of 'condition'.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
|
||||||
return PollUntilWithContext(ContextForChannel(stopCh), interval, condition.WithContext())
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollUntilWithContext tries a condition func until it returns true,
|
|
||||||
// an error or the specified context is cancelled or expired.
|
|
||||||
//
|
|
||||||
// PollUntilWithContext always waits interval before the first run of 'condition'.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
func PollUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
|
||||||
return poll(ctx, false, poller(interval, 0), condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollInfinite tries a condition func until it returns true or an error
|
|
||||||
//
|
|
||||||
// PollInfinite always waits the interval before the run of 'condition'.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
func PollInfinite(interval time.Duration, condition ConditionFunc) error {
|
|
||||||
return PollInfiniteWithContext(context.Background(), interval, condition.WithContext())
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollInfiniteWithContext tries a condition func until it returns true or an error
|
|
||||||
//
|
|
||||||
// PollInfiniteWithContext always waits the interval before the run of 'condition'.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
func PollInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
|
||||||
return poll(ctx, false, poller(interval, 0), condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollImmediate tries a condition func until it returns true, an error, or the timeout
|
|
||||||
// is reached.
|
|
||||||
//
|
|
||||||
// PollImmediate always checks 'condition' before waiting for the interval. 'condition'
|
|
||||||
// will always be invoked at least once.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
//
|
|
||||||
// If you want to immediately Poll something forever, see PollImmediateInfinite.
|
|
||||||
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
|
|
||||||
return PollImmediateWithContext(context.Background(), interval, timeout, condition.WithContext())
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollImmediateWithContext tries a condition func until it returns true, an error,
|
|
||||||
// or the timeout is reached or the specified context expires, whichever happens first.
|
|
||||||
//
|
|
||||||
// PollImmediateWithContext always checks 'condition' before waiting for the interval.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
//
|
|
||||||
// If you want to immediately Poll something forever, see PollImmediateInfinite.
|
|
||||||
func PollImmediateWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error {
|
|
||||||
return poll(ctx, true, poller(interval, timeout), condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
|
|
||||||
//
|
|
||||||
// PollImmediateUntil runs the 'condition' before waiting for the interval.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
|
||||||
return PollImmediateUntilWithContext(ContextForChannel(stopCh), interval, condition.WithContext())
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollImmediateUntilWithContext tries a condition func until it returns true,
|
|
||||||
// an error or the specified context is cancelled or expired.
|
|
||||||
//
|
|
||||||
// PollImmediateUntilWithContext runs the 'condition' before waiting for the interval.
|
|
||||||
// 'condition' will always be invoked at least once.
|
|
||||||
func PollImmediateUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
|
||||||
return poll(ctx, true, poller(interval, 0), condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollImmediateInfinite tries a condition func until it returns true or an error
|
|
||||||
//
|
|
||||||
// PollImmediateInfinite runs the 'condition' before waiting for the interval.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error {
|
|
||||||
return PollImmediateInfiniteWithContext(context.Background(), interval, condition.WithContext())
|
|
||||||
}
|
|
||||||
|
|
||||||
// PollImmediateInfiniteWithContext tries a condition func until it returns true
|
|
||||||
// or an error or the specified context gets cancelled or expired.
|
|
||||||
//
|
|
||||||
// PollImmediateInfiniteWithContext runs the 'condition' before waiting for the interval.
|
|
||||||
//
|
|
||||||
// Some intervals may be missed if the condition takes too long or the time
|
|
||||||
// window is too short.
|
|
||||||
func PollImmediateInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error {
|
|
||||||
return poll(ctx, true, poller(interval, 0), condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Internally used, each of the public 'Poll*' function defined in this
|
|
||||||
// package should invoke this internal function with appropriate parameters.
|
|
||||||
// ctx: the context specified by the caller, for infinite polling pass
|
|
||||||
// a context that never gets cancelled or expired.
|
|
||||||
// immediate: if true, the 'condition' will be invoked before waiting for the interval,
|
|
||||||
// in this case 'condition' will always be invoked at least once.
|
|
||||||
// wait: user specified WaitFunc function that controls at what interval the condition
|
|
||||||
// function should be invoked periodically and whether it is bound by a timeout.
|
|
||||||
// condition: user specified ConditionWithContextFunc function.
|
|
||||||
func poll(ctx context.Context, immediate bool, wait waitWithContextFunc, condition ConditionWithContextFunc) error {
|
|
||||||
if immediate {
|
|
||||||
done, err := runConditionWithCrashProtectionWithContext(ctx, condition)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if done {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// returning ctx.Err() will break backward compatibility
|
|
||||||
return ErrWaitTimeout
|
|
||||||
default:
|
|
||||||
return waitForWithContext(ctx, wait, condition)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitFunc creates a channel that receives an item every time a test
|
// waitFunc creates a channel that receives an item every time a test
|
||||||
// should be executed and is closed when the last test should be invoked.
|
// should be executed and is closed when the last test should be invoked.
|
||||||
type waitFunc func(done <-chan struct{}) <-chan struct{}
|
type waitFunc func(done <-chan struct{}) <-chan struct{}
|
||||||
@ -662,83 +210,3 @@ func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn Condit
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// poller returns a WaitFunc that will send to the channel every interval until
|
|
||||||
// timeout has elapsed and then closes the channel.
|
|
||||||
//
|
|
||||||
// Over very short intervals you may receive no ticks before the channel is
|
|
||||||
// closed. A timeout of 0 is interpreted as an infinity, and in such a case
|
|
||||||
// it would be the caller's responsibility to close the done channel.
|
|
||||||
// Failure to do so would result in a leaked goroutine.
|
|
||||||
//
|
|
||||||
// Output ticks are not buffered. If the channel is not ready to receive an
|
|
||||||
// item, the tick is skipped.
|
|
||||||
func poller(interval, timeout time.Duration) waitWithContextFunc {
|
|
||||||
return waitWithContextFunc(func(ctx context.Context) <-chan struct{} {
|
|
||||||
ch := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(ch)
|
|
||||||
|
|
||||||
tick := time.NewTicker(interval)
|
|
||||||
defer tick.Stop()
|
|
||||||
|
|
||||||
var after <-chan time.Time
|
|
||||||
if timeout != 0 {
|
|
||||||
// time.After is more convenient, but it
|
|
||||||
// potentially leaves timers around much longer
|
|
||||||
// than necessary if we exit early.
|
|
||||||
timer := time.NewTimer(timeout)
|
|
||||||
after = timer.C
|
|
||||||
defer timer.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-tick.C:
|
|
||||||
// If the consumer isn't ready for this signal drop it and
|
|
||||||
// check the other channels.
|
|
||||||
select {
|
|
||||||
case ch <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
case <-after:
|
|
||||||
return
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never
|
|
||||||
// exceeds the deadline specified by the request context.
|
|
||||||
func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
|
|
||||||
for backoff.Steps > 0 {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if ok, err := runConditionWithCrashProtectionWithContext(ctx, condition); err != nil || ok {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if backoff.Steps == 1 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
waitBeforeRetry := backoff.Step()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-time.After(waitBeforeRetry):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ErrWaitTimeout
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user