diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index d69373d64c4..ba2c010da71 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -242,7 +242,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // No leader election, run directly if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { - ctx, _ := wait.ContextForChannel(stopCh) + ctx := wait.ContextForChannel(stopCh) run(ctx, saTokenControllerInitFunc, NewControllerInitializers) return nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index b43d102a8d6..ed39e093933 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -223,6 +223,33 @@ func (cf ConditionFunc) WithContext() ConditionWithContextFunc { } } +// ContextForChannel provides a context that will be treated as cancelled +// when the provided parentCh is closed. The implementation returns +// context.Canceled for Err() if and only if the parentCh is closed. +func ContextForChannel(parentCh <-chan struct{}) context.Context { + return channelContext{stopCh: parentCh} +} + +var _ context.Context = channelContext{} + +// channelContext will behave as if the context were cancelled when stopCh is +// closed. +type channelContext struct { + stopCh <-chan struct{} +} + +func (c channelContext) Done() <-chan struct{} { return c.stopCh } +func (c channelContext) Err() error { + select { + case <-c.stopCh: + return context.Canceled + default: + return nil + } +} +func (c channelContext) Deadline() (time.Time, bool) { return time.Time{}, false } +func (c channelContext) Value(key any) any { return nil } + // runConditionWithCrashProtection runs a ConditionFunc with crash protection func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) { return runConditionWithCrashProtectionWithContext(context.TODO(), condition.WithContext()) @@ -290,25 +317,6 @@ func (b *Backoff) Step() time.Duration { return duration } -// ContextForChannel derives a child context from a parent channel. -// -// The derived context's Done channel is closed when the returned cancel function -// is called or when the parent channel is closed, whichever happens first. -// -// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked. -func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(context.Background()) - - go func() { - select { - case <-parentCh: - cancel() - case <-ctx.Done(): - } - }() - return ctx, cancel -} - // BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides // an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff() // is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in @@ -466,9 +474,7 @@ func PollWithContext(ctx context.Context, interval, timeout time.Duration, condi // PollUntil always waits interval before the first run of 'condition'. // 'condition' will always be invoked at least once. func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { - ctx, cancel := ContextForChannel(stopCh) - defer cancel() - return PollUntilWithContext(ctx, interval, condition.WithContext()) + return PollUntilWithContext(ContextForChannel(stopCh), interval, condition.WithContext()) } // PollUntilWithContext tries a condition func until it returns true, @@ -533,9 +539,7 @@ func PollImmediateWithContext(ctx context.Context, interval, timeout time.Durati // PollImmediateUntil runs the 'condition' before waiting for the interval. // 'condition' will always be invoked at least once. func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { - ctx, cancel := ContextForChannel(stopCh) - defer cancel() - return PollImmediateUntilWithContext(ctx, interval, condition.WithContext()) + return PollImmediateUntilWithContext(ContextForChannel(stopCh), interval, condition.WithContext()) } // PollImmediateUntilWithContext tries a condition func until it returns true, diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go index 267c5fb6875..7f158795e89 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go @@ -523,8 +523,7 @@ func Test_waitFor(t *testing.T) { err := func() error { done := make(chan struct{}) defer close(done) - ctx, cancel := ContextForChannel(done) - defer cancel() + ctx := ContextForChannel(done) return waitForWithContext(ctx, ticker.WithContext(), c.F.WithContext()) }() switch { @@ -547,8 +546,7 @@ func Test_waitForWithEarlyClosing_waitFunc(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ctx, cancel := ContextForChannel(stopCh) - defer cancel() + ctx := ContextForChannel(stopCh) start := time.Now() err := waitForWithContext(ctx, func(ctx context.Context) <-chan struct{} { c := make(chan struct{}) @@ -575,8 +573,7 @@ func Test_waitForWithClosedChannel(t *testing.T) { close(stopCh) c := make(chan struct{}) defer close(c) - ctx, cancel := ContextForChannel(stopCh) - defer cancel() + ctx := ContextForChannel(stopCh) start := time.Now() err := waitForWithContext(ctx, func(_ context.Context) <-chan struct{} { @@ -699,8 +696,7 @@ func TestContextForChannel(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - ctx, cancel := ContextForChannel(parentCh) - defer cancel() + ctx := ContextForChannel(parentCh) <-ctx.Done() }() } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 5a1407d1cb3..0741e2ba660 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "net/http" "strconv" @@ -228,8 +229,8 @@ func (s *EtcdOptions) Complete( } if len(s.EncryptionProviderConfigFilepath) != 0 { - ctxTransformers, closeTransformers := wait.ContextForChannel(stopCh) - ctxServer, _ := wait.ContextForChannel(stopCh) // explicitly ignore cancel here because we do not own the server's lifecycle + ctxServer := wait.ContextForChannel(stopCh) + ctxTransformers, closeTransformers := context.WithCancel(ctxServer) encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(ctxTransformers, s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload) if err != nil { diff --git a/staging/src/k8s.io/cloud-provider/app/controllermanager.go b/staging/src/k8s.io/cloud-provider/app/controllermanager.go index de4b9fd1ca2..3491912021a 100644 --- a/staging/src/k8s.io/cloud-provider/app/controllermanager.go +++ b/staging/src/k8s.io/cloud-provider/app/controllermanager.go @@ -199,7 +199,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface } if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { - ctx, _ := wait.ContextForChannel(stopCh) + ctx := wait.ContextForChannel(stopCh) run(ctx, controllerInitializers) <-stopCh return nil