mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #71326 from shomron/issue-71277-polluntil-leak
Fix goroutine leak in pkg/util/wait PollUntil()
This commit is contained in:
commit
206ecb4ab8
@ -250,6 +250,25 @@ func (b *Backoff) Step() time.Duration {
|
|||||||
return duration
|
return duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// contextForChannel derives a child context from a parent channel.
|
||||||
|
//
|
||||||
|
// The derived context's Done channel is closed when the returned cancel function
|
||||||
|
// is called or when the parent channel is closed, whichever happens first.
|
||||||
|
//
|
||||||
|
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
|
||||||
|
func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-parentCh:
|
||||||
|
cancel()
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return ctx, cancel
|
||||||
|
}
|
||||||
|
|
||||||
// ExponentialBackoff repeats a condition check with exponential backoff.
|
// ExponentialBackoff repeats a condition check with exponential backoff.
|
||||||
//
|
//
|
||||||
// It checks the condition up to Steps times, increasing the wait by multiplying
|
// It checks the condition up to Steps times, increasing the wait by multiplying
|
||||||
@ -353,7 +372,9 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro
|
|||||||
// PollUntil always waits interval before the first run of 'condition'.
|
// PollUntil always waits interval before the first run of 'condition'.
|
||||||
// 'condition' will always be invoked at least once.
|
// 'condition' will always be invoked at least once.
|
||||||
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
||||||
return WaitFor(poller(interval, 0), condition, stopCh)
|
ctx, cancel := contextForChannel(stopCh)
|
||||||
|
defer cancel()
|
||||||
|
return WaitFor(poller(interval, 0), condition, ctx.Done())
|
||||||
}
|
}
|
||||||
|
|
||||||
// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
|
// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
|
||||||
@ -422,7 +443,9 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
|
|||||||
// timeout has elapsed and then closes the channel.
|
// timeout has elapsed and then closes the channel.
|
||||||
//
|
//
|
||||||
// Over very short intervals you may receive no ticks before the channel is
|
// Over very short intervals you may receive no ticks before the channel is
|
||||||
// closed. A timeout of 0 is interpreted as an infinity.
|
// closed. A timeout of 0 is interpreted as an infinity, and in such a case
|
||||||
|
// it would be the caller's responsibility to close the done channel.
|
||||||
|
// Failure to do so would result in a leaked goroutine.
|
||||||
//
|
//
|
||||||
// Output ticks are not buffered. If the channel is not ready to receive an
|
// Output ticks are not buffered. If the channel is not ready to receive an
|
||||||
// item, the tick is skipped.
|
// item, the tick is skipped.
|
||||||
|
@ -664,3 +664,33 @@ func TestBackoff_Step(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestContextForChannel(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
parentCh := make(chan struct{})
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
ctx, cancel := contextForChannel(parentCh)
|
||||||
|
defer cancel()
|
||||||
|
<-ctx.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Closing parent channel should cancel all children contexts
|
||||||
|
close(parentCh)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(ForeverTestTimeout):
|
||||||
|
t.Errorf("unexepcted timeout waiting for parent to cancel child contexts")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user