mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #115116 from smarterclayton/hide_wait
wait: Make WaitFor and WaitForWithContext private
This commit is contained in:
commit
b01afdca66
@ -577,7 +577,7 @@ func PollImmediateInfiniteWithContext(ctx context.Context, interval time.Duratio
|
||||
// wait: user specified WaitFunc function that controls at what interval the condition
|
||||
// function should be invoked periodically and whether it is bound by a timeout.
|
||||
// condition: user specified ConditionWithContextFunc function.
|
||||
func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, condition ConditionWithContextFunc) error {
|
||||
func poll(ctx context.Context, immediate bool, wait waitWithContextFunc, condition ConditionWithContextFunc) error {
|
||||
if immediate {
|
||||
done, err := runConditionWithCrashProtectionWithContext(ctx, condition)
|
||||
if err != nil {
|
||||
@ -593,55 +593,36 @@ func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, conditi
|
||||
// returning ctx.Err() will break backward compatibility
|
||||
return ErrWaitTimeout
|
||||
default:
|
||||
return WaitForWithContext(ctx, wait, condition)
|
||||
return waitForWithContext(ctx, wait, condition)
|
||||
}
|
||||
}
|
||||
|
||||
// WaitFunc creates a channel that receives an item every time a test
|
||||
// waitFunc creates a channel that receives an item every time a test
|
||||
// should be executed and is closed when the last test should be invoked.
|
||||
type WaitFunc func(done <-chan struct{}) <-chan struct{}
|
||||
type waitFunc func(done <-chan struct{}) <-chan struct{}
|
||||
|
||||
// WithContext converts the WaitFunc to an equivalent WaitWithContextFunc
|
||||
func (w WaitFunc) WithContext() WaitWithContextFunc {
|
||||
func (w waitFunc) WithContext() waitWithContextFunc {
|
||||
return func(ctx context.Context) <-chan struct{} {
|
||||
return w(ctx.Done())
|
||||
}
|
||||
}
|
||||
|
||||
// WaitWithContextFunc creates a channel that receives an item every time a test
|
||||
// waitWithContextFunc creates a channel that receives an item every time a test
|
||||
// should be executed and is closed when the last test should be invoked.
|
||||
//
|
||||
// When the specified context gets cancelled or expires the function
|
||||
// stops sending item and returns immediately.
|
||||
type WaitWithContextFunc func(ctx context.Context) <-chan struct{}
|
||||
//
|
||||
// Deprecated: Will be removed when the legacy Poll methods are removed.
|
||||
type waitWithContextFunc func(ctx context.Context) <-chan struct{}
|
||||
|
||||
// WaitFor continually checks 'fn' as driven by 'wait'.
|
||||
// waitForWithContext continually checks 'fn' as driven by 'wait'.
|
||||
//
|
||||
// WaitFor gets a channel from 'wait()”, and then invokes 'fn' once for every value
|
||||
// placed on the channel and once more when the channel is closed. If the channel is closed
|
||||
// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout.
|
||||
//
|
||||
// If 'fn' returns an error the loop ends and that error is returned. If
|
||||
// 'fn' returns true the loop ends and nil is returned.
|
||||
//
|
||||
// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever
|
||||
// returning true.
|
||||
//
|
||||
// When the done channel is closed, because the golang `select` statement is
|
||||
// "uniform pseudo-random", the `fn` might still run one or multiple time,
|
||||
// though eventually `WaitFor` will return.
|
||||
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
|
||||
ctx, cancel := ContextForChannel(done)
|
||||
defer cancel()
|
||||
return WaitForWithContext(ctx, wait.WithContext(), fn.WithContext())
|
||||
}
|
||||
|
||||
// WaitForWithContext continually checks 'fn' as driven by 'wait'.
|
||||
//
|
||||
// WaitForWithContext gets a channel from 'wait()”, and then invokes 'fn'
|
||||
// waitForWithContext gets a channel from 'wait()”, and then invokes 'fn'
|
||||
// once for every value placed on the channel and once more when the
|
||||
// channel is closed. If the channel is closed and 'fn'
|
||||
// returns false without error, WaitForWithContext returns ErrWaitTimeout.
|
||||
// returns false without error, waitForWithContext returns ErrWaitTimeout.
|
||||
//
|
||||
// If 'fn' returns an error the loop ends and that error is returned. If
|
||||
// 'fn' returns true the loop ends and nil is returned.
|
||||
@ -651,8 +632,10 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
|
||||
//
|
||||
// When the ctx.Done() channel is closed, because the golang `select` statement is
|
||||
// "uniform pseudo-random", the `fn` might still run one or multiple times,
|
||||
// though eventually `WaitForWithContext` will return.
|
||||
func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error {
|
||||
// though eventually `waitForWithContext` will return.
|
||||
//
|
||||
// Deprecated: Will be removed when the legacy Poll methods are removed.
|
||||
func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn ConditionWithContextFunc) error {
|
||||
waitCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
c := wait(waitCtx)
|
||||
@ -686,8 +669,8 @@ func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn Condit
|
||||
//
|
||||
// Output ticks are not buffered. If the channel is not ready to receive an
|
||||
// item, the tick is skipped.
|
||||
func poller(interval, timeout time.Duration) WaitWithContextFunc {
|
||||
return WaitWithContextFunc(func(ctx context.Context) <-chan struct{} {
|
||||
func poller(interval, timeout time.Duration) waitWithContextFunc {
|
||||
return waitWithContextFunc(func(ctx context.Context) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
|
@ -311,7 +311,7 @@ type fakePoller struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func fakeTicker(max int, used *int32, doneFunc func()) WaitFunc {
|
||||
func fakeTicker(max int, used *int32, doneFunc func()) waitFunc {
|
||||
return func(done <-chan struct{}) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
@ -332,7 +332,7 @@ func fakeTicker(max int, used *int32, doneFunc func()) WaitFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func (fp *fakePoller) GetWaitFunc() WaitFunc {
|
||||
func (fp *fakePoller) GetwaitFunc() waitFunc {
|
||||
fp.wg.Add(1)
|
||||
return fakeTicker(fp.max, &fp.used, fp.wg.Done)
|
||||
}
|
||||
@ -347,7 +347,7 @@ func TestPoll(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if err := poll(ctx, false, fp.GetWaitFunc().WithContext(), f.WithContext()); err != nil {
|
||||
if err := poll(ctx, false, fp.GetwaitFunc().WithContext(), f.WithContext()); err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
fp.wg.Wait()
|
||||
@ -369,7 +369,7 @@ func TestPollError(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if err := poll(ctx, false, fp.GetWaitFunc().WithContext(), f.WithContext()); err == nil || err != expectedError {
|
||||
if err := poll(ctx, false, fp.GetwaitFunc().WithContext(), f.WithContext()); err == nil || err != expectedError {
|
||||
t.Fatalf("Expected error %v, got none %v", expectedError, err)
|
||||
}
|
||||
fp.wg.Wait()
|
||||
@ -389,10 +389,10 @@ func TestPollImmediate(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if err := poll(ctx, true, fp.GetWaitFunc().WithContext(), f.WithContext()); err != nil {
|
||||
if err := poll(ctx, true, fp.GetwaitFunc().WithContext(), f.WithContext()); err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
// We don't need to wait for fp.wg, as pollImmediate shouldn't call WaitFunc at all.
|
||||
// We don't need to wait for fp.wg, as pollImmediate shouldn't call waitFunc at all.
|
||||
if invocations != 1 {
|
||||
t.Errorf("Expected exactly one invocation, got %d", invocations)
|
||||
}
|
||||
@ -411,10 +411,10 @@ func TestPollImmediateError(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if err := poll(ctx, true, fp.GetWaitFunc().WithContext(), f.WithContext()); err == nil || err != expectedError {
|
||||
if err := poll(ctx, true, fp.GetwaitFunc().WithContext(), f.WithContext()); err == nil || err != expectedError {
|
||||
t.Fatalf("Expected error %v, got none %v", expectedError, err)
|
||||
}
|
||||
// We don't need to wait for fp.wg, as pollImmediate shouldn't call WaitFunc at all.
|
||||
// We don't need to wait for fp.wg, as pollImmediate shouldn't call waitFunc at all.
|
||||
used := atomic.LoadInt32(&fp.used)
|
||||
if used != 0 {
|
||||
t.Errorf("Expected exactly zero ticks, got %d", used)
|
||||
@ -481,7 +481,7 @@ func TestPollForever(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitFor(t *testing.T) {
|
||||
func Test_waitFor(t *testing.T) {
|
||||
var invocations int
|
||||
testCases := map[string]struct {
|
||||
F ConditionFunc
|
||||
@ -504,7 +504,7 @@ func TestWaitFor(t *testing.T) {
|
||||
return false, nil
|
||||
}),
|
||||
2,
|
||||
3, // the contract of WaitFor() says the func is called once more at the end of the wait
|
||||
3, // the contract of waitFor() says the func is called once more at the end of the wait
|
||||
true,
|
||||
},
|
||||
"returns immediately on error": {
|
||||
@ -523,7 +523,9 @@ func TestWaitFor(t *testing.T) {
|
||||
err := func() error {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
return WaitFor(ticker, c.F, done)
|
||||
ctx, cancel := ContextForChannel(done)
|
||||
defer cancel()
|
||||
return waitForWithContext(ctx, ticker.WithContext(), c.F.WithContext())
|
||||
}()
|
||||
switch {
|
||||
case c.Err && err == nil:
|
||||
@ -539,23 +541,25 @@ func TestWaitFor(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWaitForWithEarlyClosingWaitFunc tests WaitFor when the WaitFunc closes its channel. The WaitFor should
|
||||
// Test_waitForWithEarlyClosing_waitFunc tests waitFor when the waitFunc closes its channel. The waitFor should
|
||||
// always return ErrWaitTimeout.
|
||||
func TestWaitForWithEarlyClosingWaitFunc(t *testing.T) {
|
||||
func Test_waitForWithEarlyClosing_waitFunc(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
ctx, cancel := ContextForChannel(stopCh)
|
||||
defer cancel()
|
||||
start := time.Now()
|
||||
err := WaitFor(func(done <-chan struct{}) <-chan struct{} {
|
||||
err := waitForWithContext(ctx, func(ctx context.Context) <-chan struct{} {
|
||||
c := make(chan struct{})
|
||||
close(c)
|
||||
return c
|
||||
}, func() (bool, error) {
|
||||
}, func(_ context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
})
|
||||
duration := time.Since(start)
|
||||
|
||||
// The WaitFor should return immediately, so the duration is close to 0s.
|
||||
// The waitFor should return immediately, so the duration is close to 0s.
|
||||
if duration >= ForeverTestTimeout/2 {
|
||||
t.Errorf("expected short timeout duration")
|
||||
}
|
||||
@ -564,48 +568,51 @@ func TestWaitForWithEarlyClosingWaitFunc(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWaitForWithClosedChannel tests WaitFor when it receives a closed channel. The WaitFor should
|
||||
// Test_waitForWithClosedChannel tests waitFor when it receives a closed channel. The waitFor should
|
||||
// always return ErrWaitTimeout.
|
||||
func TestWaitForWithClosedChannel(t *testing.T) {
|
||||
func Test_waitForWithClosedChannel(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
close(stopCh)
|
||||
c := make(chan struct{})
|
||||
defer close(c)
|
||||
ctx, cancel := ContextForChannel(stopCh)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
err := WaitFor(func(done <-chan struct{}) <-chan struct{} {
|
||||
err := waitForWithContext(ctx, func(_ context.Context) <-chan struct{} {
|
||||
return c
|
||||
}, func() (bool, error) {
|
||||
}, func(_ context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
})
|
||||
duration := time.Since(start)
|
||||
// The WaitFor should return immediately, so the duration is close to 0s.
|
||||
// The waitFor should return immediately, so the duration is close to 0s.
|
||||
if duration >= ForeverTestTimeout/2 {
|
||||
t.Errorf("expected short timeout duration")
|
||||
}
|
||||
// The interval of the poller is ForeverTestTimeout, so the WaitFor should always return ErrWaitTimeout.
|
||||
// The interval of the poller is ForeverTestTimeout, so the waitFor should always return ErrWaitTimeout.
|
||||
if err != ErrWaitTimeout {
|
||||
t.Errorf("expected ErrWaitTimeout from WaitFunc")
|
||||
}
|
||||
}
|
||||
|
||||
// TestWaitForWithContextCancelsContext verifies that after the condition func returns true,
|
||||
// WaitForWithContext cancels the context it supplies to the WaitWithContextFunc.
|
||||
func TestWaitForWithContextCancelsContext(t *testing.T) {
|
||||
// Test_waitForWithContextCancelsContext verifies that after the condition func returns true,
|
||||
// waitForWithContext cancels the context it supplies to the WaitWithContextFunc.
|
||||
func Test_waitForWithContextCancelsContext(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
waitFunc := poller(time.Millisecond, ForeverTestTimeout)
|
||||
|
||||
var ctxPassedToWait context.Context
|
||||
WaitForWithContext(ctx, func(ctx context.Context) <-chan struct{} {
|
||||
waitForWithContext(ctx, func(ctx context.Context) <-chan struct{} {
|
||||
ctxPassedToWait = ctx
|
||||
return waitFunc(ctx)
|
||||
}, func(ctx context.Context) (bool, error) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
return true, nil
|
||||
})
|
||||
// The polling goroutine should be closed after WaitForWithContext returning.
|
||||
// The polling goroutine should be closed after waitForWithContext returning.
|
||||
if ctxPassedToWait.Err() != context.Canceled {
|
||||
t.Errorf("expected the context passed to WaitForWithContext to be closed with: %v, but got: %v", context.Canceled, ctxPassedToWait.Err())
|
||||
t.Errorf("expected the context passed to waitForWithContext to be closed with: %v, but got: %v", context.Canceled, ctxPassedToWait.Err())
|
||||
}
|
||||
}
|
||||
|
||||
@ -1004,13 +1011,13 @@ func TestPollImmediateUntilWithContext(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForWithContext(t *testing.T) {
|
||||
func Test_waitForWithContext(t *testing.T) {
|
||||
fakeErr := errors.New("fake error")
|
||||
tests := []struct {
|
||||
name string
|
||||
context func() (context.Context, context.CancelFunc)
|
||||
condition ConditionWithContextFunc
|
||||
waitFunc func() WaitFunc
|
||||
waitFunc func() waitFunc
|
||||
attemptsExpected int
|
||||
errExpected error
|
||||
}{
|
||||
@ -1022,7 +1029,7 @@ func TestWaitForWithContext(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return true, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
waitFunc: func() waitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
attemptsExpected: 1,
|
||||
errExpected: nil,
|
||||
},
|
||||
@ -1034,8 +1041,8 @@ func TestWaitForWithContext(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
// the contract of WaitForWithContext() says the func is called once more at the end of the wait
|
||||
waitFunc: func() waitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
// the contract of waitForWithContext() says the func is called once more at the end of the wait
|
||||
attemptsExpected: 3,
|
||||
errExpected: ErrWaitTimeout,
|
||||
},
|
||||
@ -1047,7 +1054,7 @@ func TestWaitForWithContext(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, fakeErr
|
||||
}),
|
||||
waitFunc: func() WaitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
waitFunc: func() waitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
attemptsExpected: 1,
|
||||
errExpected: fakeErr,
|
||||
},
|
||||
@ -1061,7 +1068,7 @@ func TestWaitForWithContext(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc {
|
||||
waitFunc: func() waitFunc {
|
||||
return func(done <-chan struct{}) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
// never tick on this channel
|
||||
@ -1086,7 +1093,7 @@ func TestWaitForWithContext(t *testing.T) {
|
||||
ctx, cancel := test.context()
|
||||
defer cancel()
|
||||
|
||||
return WaitForWithContext(ctx, ticker.WithContext(), conditionWrapper)
|
||||
return waitForWithContext(ctx, ticker.WithContext(), conditionWrapper)
|
||||
}()
|
||||
|
||||
if test.errExpected != err {
|
||||
@ -1105,7 +1112,7 @@ func TestPollInternal(t *testing.T) {
|
||||
name string
|
||||
context func() (context.Context, context.CancelFunc)
|
||||
immediate bool
|
||||
waitFunc func() WaitFunc
|
||||
waitFunc func() waitFunc
|
||||
condition ConditionWithContextFunc
|
||||
cancelContextAfter int
|
||||
attemptsExpected int
|
||||
@ -1188,7 +1195,7 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, fakeErr
|
||||
}),
|
||||
waitFunc: func() WaitFunc { return fakeTicker(5, nil, func() {}) },
|
||||
waitFunc: func() waitFunc { return fakeTicker(5, nil, func() {}) },
|
||||
attemptsExpected: 1,
|
||||
errExpected: fakeErr,
|
||||
},
|
||||
@ -1201,7 +1208,7 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return true, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc { return fakeTicker(5, nil, func() {}) },
|
||||
waitFunc: func() waitFunc { return fakeTicker(5, nil, func() {}) },
|
||||
attemptsExpected: 1,
|
||||
errExpected: nil,
|
||||
},
|
||||
@ -1214,7 +1221,7 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return true, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc {
|
||||
waitFunc: func() waitFunc {
|
||||
return func(done <-chan struct{}) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
@ -1233,7 +1240,7 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, fakeErr
|
||||
}),
|
||||
waitFunc: func() WaitFunc {
|
||||
waitFunc: func() waitFunc {
|
||||
return func(done <-chan struct{}) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
@ -1252,7 +1259,7 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc {
|
||||
waitFunc: func() waitFunc {
|
||||
return func(done <-chan struct{}) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
@ -1271,8 +1278,8 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
// the contract of WaitForWithContext() says the func is called once more at the end of the wait
|
||||
waitFunc: func() waitFunc { return fakeTicker(2, nil, func() {}) },
|
||||
// the contract of waitForWithContext() says the func is called once more at the end of the wait
|
||||
attemptsExpected: 3,
|
||||
errExpected: ErrWaitTimeout,
|
||||
},
|
||||
@ -1285,7 +1292,7 @@ func TestPollInternal(t *testing.T) {
|
||||
condition: ConditionWithContextFunc(func(context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}),
|
||||
waitFunc: func() WaitFunc {
|
||||
waitFunc: func() waitFunc {
|
||||
return func(done <-chan struct{}) <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
// just tick twice
|
||||
@ -1305,7 +1312,7 @@ func TestPollInternal(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
var attempts int
|
||||
ticker := WaitFunc(func(done <-chan struct{}) <-chan struct{} {
|
||||
ticker := waitFunc(func(done <-chan struct{}) <-chan struct{} {
|
||||
return nil
|
||||
})
|
||||
if test.waitFunc != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user