mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #119762 from AxeZhan/PollUntilContextCancel
wait.PollUntilContextCancel immediately executes condition once
This commit is contained in:
commit
227d1b2357
@ -40,6 +40,10 @@ func loopConditionUntilContext(ctx context.Context, t Timer, immediate, sliding
|
|||||||
var timeCh <-chan time.Time
|
var timeCh <-chan time.Time
|
||||||
doneCh := ctx.Done()
|
doneCh := ctx.Done()
|
||||||
|
|
||||||
|
if !sliding {
|
||||||
|
timeCh = t.C()
|
||||||
|
}
|
||||||
|
|
||||||
// if immediate is true the condition is
|
// if immediate is true the condition is
|
||||||
// guaranteed to be executed at least once,
|
// guaranteed to be executed at least once,
|
||||||
// if we haven't requested immediate execution, delay once
|
// if we haven't requested immediate execution, delay once
|
||||||
@ -50,17 +54,27 @@ func loopConditionUntilContext(ctx context.Context, t Timer, immediate, sliding
|
|||||||
}(); err != nil || ok {
|
}(); err != nil || ok {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
if sliding {
|
||||||
timeCh = t.C()
|
timeCh = t.C()
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
|
||||||
|
// Wait for either the context to be cancelled or the next invocation be called
|
||||||
select {
|
select {
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case <-timeCh:
|
case <-timeCh:
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
// IMPORTANT: Because there is no channel priority selection in golang
|
||||||
// checking ctx.Err() is slightly faster than checking a select
|
// it is possible for very short timers to "win" the race in the previous select
|
||||||
|
// repeatedly even when the context has been canceled. We therefore must
|
||||||
|
// explicitly check for context cancellation on every loop and exit if true to
|
||||||
|
// guarantee that we don't invoke condition more than once after context has
|
||||||
|
// been cancelled.
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -77,21 +91,5 @@ func loopConditionUntilContext(ctx context.Context, t Timer, immediate, sliding
|
|||||||
if sliding {
|
if sliding {
|
||||||
t.Next()
|
t.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
if timeCh == nil {
|
|
||||||
timeCh = t.C()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: b/c there is no priority selection in golang
|
|
||||||
// it is possible for this to race, meaning we could
|
|
||||||
// trigger t.C and doneCh, and t.C select falls through.
|
|
||||||
// In order to mitigate we re-check doneCh at the beginning
|
|
||||||
// of every loop to guarantee at-most one extra execution
|
|
||||||
// of condition.
|
|
||||||
select {
|
|
||||||
case <-doneCh:
|
|
||||||
return ctx.Err()
|
|
||||||
case <-timeCh:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -99,6 +99,7 @@ func Test_loopConditionUntilContext_semantic(t *testing.T) {
|
|||||||
cancelContextAfter int
|
cancelContextAfter int
|
||||||
attemptsExpected int
|
attemptsExpected int
|
||||||
errExpected error
|
errExpected error
|
||||||
|
timer Timer
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "condition successful is only one attempt",
|
name: "condition successful is only one attempt",
|
||||||
@ -203,45 +204,88 @@ func Test_loopConditionUntilContext_semantic(t *testing.T) {
|
|||||||
attemptsExpected: 0,
|
attemptsExpected: 0,
|
||||||
errExpected: context.DeadlineExceeded,
|
errExpected: context.DeadlineExceeded,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "context canceled before the second execution and immediate",
|
||||||
|
immediate: true,
|
||||||
|
context: func() (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithTimeout(context.Background(), time.Second)
|
||||||
|
},
|
||||||
|
callback: func(attempts int) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
},
|
||||||
|
attemptsExpected: 1,
|
||||||
|
errExpected: context.DeadlineExceeded,
|
||||||
|
timer: Backoff{Duration: 2 * time.Second}.Timer(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "immediate and long duration of condition and sliding false",
|
||||||
|
immediate: true,
|
||||||
|
sliding: false,
|
||||||
|
context: func() (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithTimeout(context.Background(), time.Second)
|
||||||
|
},
|
||||||
|
callback: func(attempts int) (bool, error) {
|
||||||
|
if attempts >= 4 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second / 5)
|
||||||
|
return false, nil
|
||||||
|
},
|
||||||
|
attemptsExpected: 4,
|
||||||
|
timer: Backoff{Duration: time.Second / 5, Jitter: 0.001}.Timer(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "immediate and long duration of condition and sliding true",
|
||||||
|
immediate: true,
|
||||||
|
sliding: true,
|
||||||
|
context: func() (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithTimeout(context.Background(), time.Second)
|
||||||
|
},
|
||||||
|
callback: func(attempts int) (bool, error) {
|
||||||
|
if attempts >= 4 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second / 5)
|
||||||
|
return false, nil
|
||||||
|
},
|
||||||
|
errExpected: context.DeadlineExceeded,
|
||||||
|
attemptsExpected: 3,
|
||||||
|
timer: Backoff{Duration: time.Second / 5, Jitter: 0.001}.Timer(),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
for _, immediate := range []bool{true, false} {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
t.Run(fmt.Sprintf("immediate=%t", immediate), func(t *testing.T) {
|
contextFn := test.context
|
||||||
for _, sliding := range []bool{true, false} {
|
if contextFn == nil {
|
||||||
t.Run(fmt.Sprintf("sliding=%t", sliding), func(t *testing.T) {
|
contextFn = defaultContext
|
||||||
t.Run(test.name, func(t *testing.T) {
|
}
|
||||||
contextFn := test.context
|
ctx, cancel := contextFn()
|
||||||
if contextFn == nil {
|
defer cancel()
|
||||||
contextFn = defaultContext
|
|
||||||
}
|
|
||||||
ctx, cancel := contextFn()
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
timer := Backoff{Duration: time.Microsecond}.Timer()
|
timer := test.timer
|
||||||
attempts := 0
|
if timer == nil {
|
||||||
err := loopConditionUntilContext(ctx, timer, test.immediate, test.sliding, func(_ context.Context) (bool, error) {
|
timer = Backoff{Duration: time.Microsecond}.Timer()
|
||||||
attempts++
|
}
|
||||||
defer func() {
|
attempts := 0
|
||||||
if test.cancelContextAfter > 0 && test.cancelContextAfter == attempts {
|
err := loopConditionUntilContext(ctx, timer, test.immediate, test.sliding, func(_ context.Context) (bool, error) {
|
||||||
cancel()
|
attempts++
|
||||||
}
|
defer func() {
|
||||||
}()
|
if test.cancelContextAfter > 0 && test.cancelContextAfter == attempts {
|
||||||
return test.callback(attempts)
|
cancel()
|
||||||
})
|
}
|
||||||
|
}()
|
||||||
if test.errExpected != err {
|
return test.callback(attempts)
|
||||||
t.Errorf("expected error: %v but got: %v", test.errExpected, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if test.attemptsExpected != attempts {
|
|
||||||
t.Errorf("expected attempts count: %d but got: %d", test.attemptsExpected, attempts)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
if test.errExpected != err {
|
||||||
|
t.Errorf("expected error: %v but got: %v", test.errExpected, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.attemptsExpected != attempts {
|
||||||
|
t.Errorf("expected attempts count: %d but got: %d", test.attemptsExpected, attempts)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user