From 0869e636a904847752268511513293822a9b95e9 Mon Sep 17 00:00:00 2001 From: Oren Shomron Date: Wed, 21 Nov 2018 14:25:24 -0500 Subject: [PATCH] Fix goroutine leak in pkg/util/wait PollUntil() Ensures the poller spawned in wait.PollUntil is signalled after the wait condition is met or returns an error, even if the original stopCh is never closed. Signed-off-by: Oren Shomron --- .../k8s.io/apimachinery/pkg/util/wait/wait.go | 27 +++++++++++++++-- .../apimachinery/pkg/util/wait/wait_test.go | 30 +++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index 590c17b4c59..d181719115b 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -218,6 +218,25 @@ func (b *Backoff) Step() time.Duration { return duration } +// contextForChannel derives a child context from a parent channel. +// +// The derived context's Done channel is closed when the returned cancel function +// is called or when the parent channel is closed, whichever happens first. +// +// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked. +func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + select { + case <-parentCh: + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} + // ExponentialBackoff repeats a condition check with exponential backoff. // // It checks the condition up to Steps times, increasing the wait by multiplying @@ -321,7 +340,9 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro // PollUntil always waits interval before the first run of 'condition'. // 'condition' will always be invoked at least once. func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { - return WaitFor(poller(interval, 0), condition, stopCh) + ctx, cancel := contextForChannel(stopCh) + defer cancel() + return WaitFor(poller(interval, 0), condition, ctx.Done()) } // PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed. @@ -380,7 +401,9 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { // timeout has elapsed and then closes the channel. // // Over very short intervals you may receive no ticks before the channel is -// closed. A timeout of 0 is interpreted as an infinity. +// closed. A timeout of 0 is interpreted as an infinity, and in such a case +// it would be the caller's responsibility to close the done channel. +// Failure to do so would result in a leaked goroutine. // // Output ticks are not buffered. If the channel is not ready to receive an // item, the tick is skipped. diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go index 24073bb1922..795cc6a2634 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go @@ -544,3 +544,33 @@ func TestBackoff_Step(t *testing.T) { } } } + +func TestContextForChannel(t *testing.T) { + var wg sync.WaitGroup + parentCh := make(chan struct{}) + done := make(chan struct{}) + + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := contextForChannel(parentCh) + defer cancel() + <-ctx.Done() + }() + } + + go func() { + wg.Wait() + close(done) + }() + + // Closing parent channel should cancel all children contexts + close(parentCh) + + select { + case <-done: + case <-time.After(ForeverTestTimeout): + t.Errorf("unexepcted timeout waiting for parent to cancel child contexts") + } +}