diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 231f6921cd9..deb3a6ad4da 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -20,6 +20,7 @@ limitations under the License. package app import ( + "context" "crypto/tls" "fmt" "net/http" @@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`, } // add feature enablement metrics utilfeature.DefaultMutableFeatureGate.AddMetrics() - return Run(completedOptions, genericapiserver.SetupSignalHandler()) + return Run(cmd.Context(), completedOptions) }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { @@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`, return nil }, } + cmd.SetContext(genericapiserver.SetupSignalContext()) fs := cmd.Flags() namedFlagSets := s.Flags() @@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`, } // Run runs the specified APIServer. This should never exit. -func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error { +func Run(ctx context.Context, opts options.CompletedOptions) error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) @@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error { return err } - return prepared.Run(stopCh) + return prepared.Run(ctx) } // CreateServerChain creates the apiservers connected via delegation. diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 357f01b9227..3d00006e38a 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -32,6 +32,7 @@ import ( "os" "path/filepath" "runtime" + "testing" "time" "github.com/spf13/pflag" @@ -56,10 +57,11 @@ import ( "k8s.io/klog/v2" "k8s.io/kube-aggregator/pkg/apiserver" "k8s.io/kubernetes/pkg/features" + testutil "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" - testutil "k8s.io/kubernetes/test/utils" ) func init() { @@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions { // Note: we return a tear-down func instead of a stop channel because the later will leak temporary // files that because Golang testing's call to os.Exit will not give a stop channel go routine // enough time to remove temporary files. -func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { +func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { + tCtx := ktesting.Init(t) + if instanceOptions == nil { instanceOptions = NewDefaultTestServerOptions() } @@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo return result, fmt.Errorf("failed to create temp dir: %v", err) } - stopCh := make(chan struct{}) var errCh chan error tearDown := func() { - // Closing stopCh is stopping apiserver and cleaning up + // Cancel is stopping apiserver and cleaning up // after itself, including shutting down its storage layer. - close(stopCh) + tCtx.Cancel("tearing down") // If the apiserver was started, let's wait for it to // shutdown clearly. @@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } errCh = make(chan error) - go func(stopCh <-chan struct{}) { + go func() { defer close(errCh) prepared, err := server.PrepareRun() if err != nil { errCh <- err - } else if err := prepared.Run(stopCh); err != nil { + } else if err := prepared.Run(tCtx); err != nil { errCh <- err } - }(stopCh) + }() client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) if err != nil { @@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } // StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed. -func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer { +func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer { result, err := StartTestServer(t, instanceOptions, flags, storageConfig) if err == nil { return &result diff --git a/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go b/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go index 831290d5de9..194746dc11c 100644 --- a/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go +++ b/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go @@ -55,9 +55,6 @@ func NewController( secondaryRange net.IPNet, client clientset.Interface, ) *Controller { - broadcaster := record.NewBroadcaster() - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) - c := &Controller{ client: client, interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval @@ -79,9 +76,6 @@ func NewController( c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer()) c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced - c.eventBroadcaster = broadcaster - c.eventRecorder = recorder - return c } @@ -101,9 +95,12 @@ type Controller struct { } // Start will not return until the default ServiceCIDR exists or stopCh is closed. -func (c *Controller) Start(stopCh <-chan struct{}) { +func (c *Controller) Start(ctx context.Context) { defer utilruntime.HandleCrash() + stopCh := ctx.Done() + c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx)) + c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) c.eventBroadcaster.StartStructuredLogging(0) c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) defer c.eventBroadcaster.Shutdown() @@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) { return } - // derive a context from the stopCh so we can cancel the poll loop - ctx := wait.ContextForChannel(stopCh) // wait until first successfully sync // this blocks apiserver startup so poll with a short interval err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 81ca65f494e..f74aba809b6 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) ) // The default serviceCIDR must exist before the apiserver is healthy // otherwise the allocators for Services will not work. - controller.Start(hookContext.StopCh) + controller.Start(hookContext) return nil }) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/main.go b/staging/src/k8s.io/apiextensions-apiserver/main.go index ce08f04fab9..142d5373a4b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/main.go +++ b/staging/src/k8s.io/apiextensions-apiserver/main.go @@ -25,8 +25,8 @@ import ( ) func main() { - stopCh := genericapiserver.SetupSignalHandler() - cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh) + ctx := genericapiserver.SetupSignalContext() + cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr) code := cli.Run(cmd) os.Exit(code) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go index b30b9cb65cd..0509a6aa6df 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "io" "github.com/spf13/cobra" @@ -25,7 +26,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" ) -func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command { +func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command { o := options.NewCustomResourceDefinitionsServerOptions(out, errOut) cmd := &cobra.Command{ @@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm if err := o.Validate(); err != nil { return err } - if err := Run(o, stopCh); err != nil { + if err := Run(c.Context(), o); err != nil { return err } return nil }, } + cmd.SetContext(ctx) fs := cmd.Flags() o.AddFlags(fs) return cmd } -func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error { +func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error { config, err := o.Config() if err != nil { return err @@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct if err != nil { return err } - return server.GenericAPIServer.PrepareRun().Run(stopCh) + return server.GenericAPIServer.PrepareRun().RunWithContext(ctx) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go index c27c38abc5f..3f2e7812bf4 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go @@ -18,6 +18,7 @@ package testing import ( "context" + "errors" "fmt" "net" "os" @@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions { // files that because Golang testing's call to os.Exit will not give a stop channel go routine // enough time to remove temporary files. func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { - stopCh := make(chan struct{}) + // TODO: this is a candidate for using what is now test/utils/ktesting, + // should that become a staging repo. + ctx, cancel := context.WithCancelCause(context.Background()) var errCh chan error tearDown := func() { - // Closing stopCh is stopping apiextensions apiserver and its + // Cancel is stopping apiextensions apiserver and its // delegates, which itself is cleaning up after itself, // including shutting down its storage layer. - close(stopCh) + cancel(errors.New("tearing down")) // If the apiextensions apiserver was started, let's wait for // it to shutdown clearly. @@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin } errCh = make(chan error) - go func(stopCh <-chan struct{}) { + go func() { defer close(errCh) - if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil { + if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil { errCh <- err } - }(stopCh) + }() t.Logf("Waiting for /healthz to be ok...") diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index a1d6d8902f9..f58f3bf9c2b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "context" + "errors" "fmt" "io" "net/http" @@ -42,6 +44,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" "k8s.io/component-base/tracing" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) { } func TestNewWithDelegate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("test is done")) delegateConfig := NewConfig(codecs) delegateConfig.ExternalAddress = "192.168.10.4:443" delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4") @@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) { return nil }) - stopCh := make(chan struct{}) - defer close(stopCh) wrappingServer.PrepareRun() - wrappingServer.RunPostStartHooks(stopCh) + wrappingServer.RunPostStartHooks(ctx) server := httptest.NewServer(wrappingServer.Handler) defer server.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 6066c0fcbab..e0dcbf75849 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/managedfields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" @@ -442,9 +443,19 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. -// This is the diagram of what channels/signals are dependent on each other: // -// | stopCh +// Deprecated: use RunWithContext instead. Run will not get removed to avoid +// breaking consumers, but should not be used in new code. +func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { + ctx := wait.ContextForChannel(stopCh) + return s.RunWithContext(ctx) +} + +// RunWithContext spawns the secure http server. It only returns if ctx is canceled +// or the secure port cannot be listened on initially. +// This is the diagram of what contexts/channels/signals are dependent on each other: +// +// | ctx // | | // | --------------------------------------------------------- // | | | @@ -477,12 +488,13 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // | | | | // | |-------------------|---------------------|----------------------------------------| // | | | -// | stopHttpServerCh (AuditBackend::Shutdown()) +// | stopHttpServerCtx (AuditBackend::Shutdown()) // | | // | listenerStoppedCh // | | // | HTTPServerStoppedListening (httpServerStoppedListeningCh) -func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { +func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error { + stopCh := ctx.Done() delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated @@ -544,9 +556,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest drainedCh := s.lifecycleSignals.InFlightRequestsDrained - stopHttpServerCh := make(chan struct{}) + // Canceling the parent context does not immediately cancel the HTTP server. + // We only inherit context values here and deal with cancellation ourselves. + stopHTTPServerCtx, stopHTTPServer := context.WithCancelCause(context.WithoutCancel(ctx)) go func() { - defer close(stopHttpServerCh) + defer stopHTTPServer(errors.New("time to stop HTTP server")) timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled() if s.ShutdownSendRetryAfter { @@ -565,7 +579,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { } } - stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout) + stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout) if err != nil { return err } @@ -694,7 +708,18 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. // The returned channel is closed when the (asynchronous) termination is finished. +// +// Deprecated: use RunWithContext instead. Run will not get removed to avoid +// breaking consumers, but should not be used in new code. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { + ctx := wait.ContextForChannel(stopCh) + return s.NonBlockingRunWithContext(ctx, shutdownTimeout) +} + +// NonBlockingRunWithContext spawns the secure http server. An error is +// returned if the secure port cannot be listened on. +// The returned channel is closed when the (asynchronous) termination is finished. +func (s preparedGenericAPIServer) NonBlockingRunWithContext(ctx context.Context, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} @@ -712,11 +737,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow // responsibility of the caller to close the provided channel to // ensure cleanup. go func() { - <-stopCh + <-ctx.Done() close(internalStopCh) }() - s.RunPostStartHooks(stopCh) + s.RunPostStartHooks(ctx) if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil { klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 967b9760648..39079c616a6 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "io" "log" @@ -41,6 +42,7 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "github.com/google/go-cmp/cmp" "golang.org/x/net/http2" @@ -200,10 +202,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t }, nil) // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -222,7 +229,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t } // signal termination event: initiate a shutdown - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntilSignaled(t, signals.ShutdownInitiated) // /readyz must return an error, but we need to give it some time @@ -423,10 +430,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }, nil) // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -445,7 +457,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t } // signal termination event: initiate a shutdown - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntilSignaled(t, signals.ShutdownInitiated) // /readyz must return an error, but we need to give it some time @@ -568,10 +580,15 @@ func TestMuxAndDiscoveryComplete(t *testing.T) { } // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -612,6 +629,9 @@ func TestPreShutdownHooks(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) s := test.server() doer := setupDoer(t, s.SecureServingInfo) @@ -643,14 +663,16 @@ func TestPreShutdownHooks(t *testing.T) { } // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 091349182e7..3549b238c9f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -52,6 +52,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2/ktesting" kubeopenapi "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/validation/spec" netutils "k8s.io/utils/net" @@ -317,17 +318,16 @@ func TestInstallAPIGroups(t *testing.T) { } func TestPrepareRun(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) s, config, assert := newMaster(t) assert.NotNil(config.OpenAPIConfig) server := httptest.NewServer(s.Handler.Director) defer server.Close() - done := make(chan struct{}) - defer close(done) s.PrepareRun() - s.RunPostStartHooks(done) + s.RunPostStartHooks(ctx) // openapi is installed in PrepareRun resp, err := http.Get(server.URL + "/openapi/v2") @@ -353,9 +353,10 @@ func TestPrepareRun(t *testing.T) { } func TestUpdateOpenAPISpec(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) s, _, assert := newMaster(t) s.PrepareRun() - s.RunPostStartHooks(make(chan struct{})) + s.RunPostStartHooks(ctx) server := httptest.NewServer(s.Handler.Director) defer server.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index 065df6bc5cc..1561d7a8475 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "errors" "fmt" "net/http" @@ -48,8 +49,13 @@ type PreShutdownHookFunc func() error type PostStartHookContext struct { // LoopbackClientConfig is a config for a privileged loopback connection to the API server LoopbackClientConfig *restclient.Config - // StopCh is the channel that will be closed when the server stops + // StopCh is the channel that will be closed when the server stops. + // + // Deprecated: use the PostStartHookContext itself instead, it contains a context that + // gets cancelled when the server stops. StopCh keeps getting provided for existing code. StopCh <-chan struct{} + // Context gets cancelled when the server stops. + context.Context } // PostStartHookProvider is an interface in addition to provide a post start hook for the api server @@ -151,15 +157,16 @@ func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdown } } -// RunPostStartHooks runs the PostStartHooks for the server -func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { +// RunPostStartHooks runs the PostStartHooks for the server. +func (s *GenericAPIServer) RunPostStartHooks(ctx context.Context) { s.postStartHookLock.Lock() defer s.postStartHookLock.Unlock() s.postStartHooksCalled = true context := PostStartHookContext{ LoopbackClientConfig: s.LoopbackClientConfig, - StopCh: stopCh, + StopCh: ctx.Done(), + Context: ctx, } for hookName, hookEntry := range s.postStartHooks { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go index f1ca80cb2e8..1ccccb4177d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go @@ -18,6 +18,7 @@ package options import ( "bytes" + "context" cryptorand "crypto/rand" "crypto/rsa" "crypto/tls" @@ -25,6 +26,7 @@ import ( "crypto/x509/pkix" "encoding/base64" "encoding/pem" + "errors" "fmt" "io/ioutil" "math/big" @@ -44,6 +46,7 @@ import ( "k8s.io/client-go/discovery" restclient "k8s.io/client-go/rest" cliflag "k8s.io/component-base/cli/flag" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -215,6 +218,10 @@ func TestServerRunWithSNI(t *testing.T) { test := tests[title] t.Run(title, func(t *testing.T) { t.Parallel() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("test has completed")) + // create server cert certDir := "testdata/" + specToName(test.Cert) serverCertBundleFile := filepath.Join(certDir, "cert") @@ -267,9 +274,6 @@ func TestServerRunWithSNI(t *testing.T) { signatures[sig] = j } - stopCh := make(chan struct{}) - defer close(stopCh) - // launch server config := setUp(t) @@ -316,7 +320,7 @@ func TestServerRunWithSNI(t *testing.T) { preparedServer := s.PrepareRun() preparedServerErrors := make(chan error) go func() { - if err := preparedServer.Run(stopCh); err != nil { + if err := preparedServer.RunWithContext(ctx); err != nil { preparedServerErrors <- err } }() diff --git a/staging/src/k8s.io/kube-aggregator/main.go b/staging/src/k8s.io/kube-aggregator/main.go index ed8d45f6a35..863a8ea72b7 100644 --- a/staging/src/k8s.io/kube-aggregator/main.go +++ b/staging/src/k8s.io/kube-aggregator/main.go @@ -32,9 +32,9 @@ import ( ) func main() { - stopCh := genericapiserver.SetupSignalHandler() + ctx := genericapiserver.SetupSignalContext() options := server.NewDefaultOptions(os.Stdout, os.Stderr) - cmd := server.NewCommandStartAggregator(options, stopCh) + cmd := server.NewCommandStartAggregator(ctx, options) code := cli.Run(cmd) os.Exit(code) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index cedc9743538..16ab2bdfa6a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -115,7 +115,7 @@ type CompletedConfig struct { } type runnable interface { - Run(stopCh <-chan struct{}) error + RunWithContext(ctx context.Context) error } // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. @@ -479,8 +479,8 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil } -func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error { - return s.runnable.Run(stopCh) +func (s preparedAPIAggregator) Run(ctx context.Context) error { + return s.runnable.RunWithContext(ctx) } // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index 88abac45418..d4ddf9490ae 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "errors" "fmt" "io" @@ -55,7 +56,7 @@ type AggregatorOptions struct { // NewCommandStartAggregator provides a CLI handler for 'start master' command // with a default AggregatorOptions. -func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct{}) *cobra.Command { +func NewCommandStartAggregator(ctx context.Context, defaults *AggregatorOptions) *cobra.Command { o := *defaults cmd := &cobra.Command{ Short: "Launch a API aggregator and proxy server", @@ -67,12 +68,13 @@ func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct if err := o.Validate(args); err != nil { return err } - if err := o.RunAggregator(stopCh); err != nil { + if err := o.RunAggregator(c.Context()); err != nil { return err } return nil }, } + cmd.SetContext(ctx) o.AddFlags(cmd.Flags()) return cmd @@ -119,7 +121,7 @@ func (o *AggregatorOptions) Complete() error { } // RunAggregator runs the API Aggregator. -func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { +func (o AggregatorOptions) RunAggregator(ctx context.Context) error { // TODO have a "real" external address if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, nil); err != nil { return fmt.Errorf("error creating self-signed certificates: %v", err) @@ -171,5 +173,5 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { if err != nil { return err } - return prepared.Run(stopCh) + return prepared.Run(ctx) } diff --git a/staging/src/k8s.io/sample-apiserver/main.go b/staging/src/k8s.io/sample-apiserver/main.go index 708a2a43125..7dd8718c804 100644 --- a/staging/src/k8s.io/sample-apiserver/main.go +++ b/staging/src/k8s.io/sample-apiserver/main.go @@ -25,9 +25,9 @@ import ( ) func main() { - stopCh := genericapiserver.SetupSignalHandler() + ctx := genericapiserver.SetupSignalContext() options := server.NewWardleServerOptions(os.Stdout, os.Stderr) - cmd := server.NewCommandStartWardleServer(options, stopCh) + cmd := server.NewCommandStartWardleServer(ctx, options) code := cli.Run(cmd) os.Exit(code) } diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index c39ef3e7bbf..d6310425a50 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "fmt" "io" "net" @@ -71,7 +72,7 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions { // NewCommandStartWardleServer provides a CLI handler for 'start master' command // with a default WardleServerOptions. -func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command { +func NewCommandStartWardleServer(ctx context.Context, defaults *WardleServerOptions) *cobra.Command { o := *defaults cmd := &cobra.Command{ Short: "Launch a wardle API server", @@ -83,12 +84,13 @@ func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan st if err := o.Validate(args); err != nil { return err } - if err := o.RunWardleServer(stopCh); err != nil { + if err := o.RunWardleServer(c.Context()); err != nil { return err } return nil }, } + cmd.SetContext(ctx) flags := cmd.Flags() o.RecommendedOptions.AddFlags(flags) @@ -154,7 +156,7 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) { } // RunWardleServer starts a new WardleServer given WardleServerOptions -func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error { +func (o WardleServerOptions) RunWardleServer(ctx context.Context) error { config, err := o.Config() if err != nil { return err @@ -171,5 +173,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error { return nil }) - return server.GenericAPIServer.PrepareRun().Run(stopCh) + return server.GenericAPIServer.PrepareRun().RunWithContext(ctx) } diff --git a/test/e2e_node/services/apiserver.go b/test/e2e_node/services/apiserver.go index f8fe58bcea0..4488715b3bd 100644 --- a/test/e2e_node/services/apiserver.go +++ b/test/e2e_node/services/apiserver.go @@ -17,6 +17,8 @@ limitations under the License. package services import ( + "context" + "errors" "fmt" "os" "testing" @@ -45,19 +47,20 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0 // APIServer is a server which manages apiserver. type APIServer struct { storageConfig storagebackend.Config - stopCh chan struct{} + cancel func(error) } // NewAPIServer creates an apiserver. func NewAPIServer(storageConfig storagebackend.Config) *APIServer { return &APIServer{ storageConfig: storageConfig, - stopCh: make(chan struct{}), } } // Start starts the apiserver, returns when apiserver is ready. -func (a *APIServer) Start() error { +// The background goroutine runs until the context is canceled +// or Stop is called, whether happens first. +func (a *APIServer) Start(ctx context.Context) error { const tokenFilePath = "known_tokens.csv" o := options.NewServerRunOptions() @@ -93,9 +96,12 @@ func (a *APIServer) Start() error { o.KubeletConfig.PreferredAddressTypes = []string{"InternalIP"} + ctx, cancel := context.WithCancelCause(ctx) + a.cancel = cancel errCh := make(chan error) go func() { defer close(errCh) + defer cancel(errors.New("shutting down")) // Calling Stop is optional, but cancel always should be invoked. completedOptions, err := o.Complete() if err != nil { errCh <- fmt.Errorf("set apiserver default options error: %w", err) @@ -106,7 +112,7 @@ func (a *APIServer) Start() error { return } - err = apiserver.Run(completedOptions, a.stopCh) + err = apiserver.Run(ctx, completedOptions) if err != nil { errCh <- fmt.Errorf("run apiserver error: %w", err) return @@ -120,12 +126,11 @@ func (a *APIServer) Start() error { return nil } -// Stop stops the apiserver. Currently, there is no way to stop the apiserver. -// The function is here only for completion. +// Stop stops the apiserver. Does not block. func (a *APIServer) Stop() error { - if a.stopCh != nil { - close(a.stopCh) - a.stopCh = nil + // nil when Start has never been called. + if a.cancel != nil { + a.cancel(errors.New("stopping API server")) } return nil } diff --git a/test/e2e_node/services/internal_services.go b/test/e2e_node/services/internal_services.go index 06e50ec9b2f..f2254d4ff7e 100644 --- a/test/e2e_node/services/internal_services.go +++ b/test/e2e_node/services/internal_services.go @@ -24,8 +24,8 @@ import ( etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/storagebackend" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/klog/v2" ) @@ -58,17 +58,17 @@ func (es *e2eServices) run(t *testing.T) error { // start starts the tests embedded services or returns an error. func (es *e2eServices) start(t *testing.T) error { - _, ctx := ktesting.NewTestContext(t) + tCtx := ktesting.Init(t) klog.Info("Starting e2e services...") err := es.startEtcd(t) if err != nil { return err } - err = es.startAPIServer(es.etcdStorage) + err = es.startAPIServer(tCtx, es.etcdStorage) if err != nil { return err } - err = es.startNamespaceController(ctx) + err = es.startNamespaceController(tCtx) if err != nil { return nil } @@ -121,10 +121,10 @@ func (es *e2eServices) startEtcd(t *testing.T) error { } // startAPIServer starts the embedded API server or returns an error. -func (es *e2eServices) startAPIServer(etcdStorage *storagebackend.Config) error { +func (es *e2eServices) startAPIServer(ctx context.Context, etcdStorage *storagebackend.Config) error { klog.Info("Starting API server") es.apiServer = NewAPIServer(*etcdStorage) - return es.apiServer.Start() + return es.apiServer.Start(ctx) } // startNamespaceController starts the embedded namespace controller or returns an error. diff --git a/test/integration/controlplane/transformation/secrets_transformation_test.go b/test/integration/controlplane/transformation/secrets_transformation_test.go index 53a62739ae3..d554cc50f9e 100644 --- a/test/integration/controlplane/transformation/secrets_transformation_test.go +++ b/test/integration/controlplane/transformation/secrets_transformation_test.go @@ -94,7 +94,7 @@ func TestSecretsShouldBeTransformed(t *testing.T) { if err != nil { t.Fatalf("Failed to create test secret, error: %v", err) } - test.runResource(test.logger, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace) + test.runResource(test.TContext, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace) test.cleanUp() } } diff --git a/test/integration/controlplane/transformation/transformation_test.go b/test/integration/controlplane/transformation/transformation_test.go index f4caa8e6974..fa9b74157ef 100644 --- a/test/integration/controlplane/transformation/transformation_test.go +++ b/test/integration/controlplane/transformation/transformation_test.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/pointer" "sigs.k8s.io/yaml" ) @@ -75,7 +76,7 @@ const ( type unSealSecret func(ctx context.Context, cipherText []byte, dataCtx value.Context, config apiserverv1.ProviderConfiguration) ([]byte, error) type transformTest struct { - logger kubeapiservertesting.Logger + ktesting.TContext storageConfig *storagebackend.Config configDir string transformerConfig string @@ -85,12 +86,13 @@ type transformTest struct { secret *corev1.Secret } -func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) { +func newTransformTest(tb testing.TB, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) { + tCtx := ktesting.Init(tb) if storageConfig == nil { storageConfig = framework.SharedEtcd() } e := transformTest{ - logger: l, + TContext: tCtx, transformerConfig: transformerConfigYAML, storageConfig: storageConfig, } @@ -113,7 +115,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin return nil, fmt.Errorf("failed to read config file: %w", err) } - if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil { + if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(tb, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil { e.cleanUp() return nil, fmt.Errorf("failed to start KubeAPI server: %w", err) } @@ -131,11 +133,11 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin if transformerConfigYAML != "" && reload { // when reloading is enabled, this healthz endpoint is always present - mustBeHealthy(l, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig) - mustNotHaveLivez(l, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig) + mustBeHealthy(tCtx, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig) + mustNotHaveLivez(tCtx, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig) // excluding healthz endpoints even if they do not exist should work - mustBeHealthy(l, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`, + mustBeHealthy(tCtx, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`, e.kubeAPIServer.ClientConfig, "kms-provider-0", "kms-provider-1", "kms-provider-2", "kms-provider-3") } @@ -530,7 +532,7 @@ func (e *transformTest) writeRawRecordToETCD(path string, data []byte) (*clientv } func (e *transformTest) printMetrics() error { - e.logger.Logf("Transformation Metrics:") + e.Logf("Transformation Metrics:") metrics, err := legacyregistry.DefaultGatherer.Gather() if err != nil { return fmt.Errorf("failed to gather metrics: %s", err) @@ -538,9 +540,9 @@ func (e *transformTest) printMetrics() error { for _, mf := range metrics { if strings.HasPrefix(*mf.Name, metricsPrefix) { - e.logger.Logf("%s", *mf.Name) + e.Logf("%s", *mf.Name) for _, metric := range mf.GetMetric() { - e.logger.Logf("%v", metric) + e.Logf("%v", metric) } } } diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index 2a70e79919e..ed69e3d532f 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils/ktesting" netutils "k8s.io/utils/net" // install all APIs @@ -64,6 +65,8 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0 // StartRealAPIServerOrDie starts an API server that is appropriate for use in tests that require one of every resource func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOptions)) *APIServer { + tCtx := ktesting.Init(t) + certDir, err := os.MkdirTemp("", t.Name()) if err != nil { t.Fatal(err) @@ -148,7 +151,6 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu kubeClient := clientset.NewForConfigOrDie(kubeClientConfig) - stopCh := make(chan struct{}) errCh := make(chan error) go func() { // Catch panics that occur in this go routine so we get a comprehensible failure @@ -164,7 +166,7 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu errCh <- err return } - if err := prepared.Run(stopCh); err != nil { + if err := prepared.Run(tCtx); err != nil { errCh <- err t.Error(err) return @@ -215,9 +217,9 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu } cleanup := func() { - // Closing stopCh is stopping apiserver and cleaning up + // Cancel stopping apiserver and cleaning up // after itself, including shutting down its storage layer. - close(stopCh) + tCtx.Cancel("cleaning up") // If the apiserver was started, let's wait for it to // shutdown clearly. diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 5465ac69bb2..a0c3852618b 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -62,9 +62,6 @@ func TestAPIServiceWaitOnStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) t.Cleanup(cancel) - stopCh := make(chan struct{}) - defer close(stopCh) - etcdConfig := framework.SharedEtcd() etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport) @@ -235,9 +232,6 @@ func TestAggregatedAPIServer(t *testing.T) { // makes the kube-apiserver very responsive. it's normally a minute dynamiccertificates.FileRefreshDuration = 1 * time.Second - stopCh := make(chan struct{}) - defer close(stopCh) - // we need the wardle port information first to set up the service resolver listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{}) if err != nil { @@ -291,7 +285,7 @@ func TestAggregatedAPIServer(t *testing.T) { } o.RecommendedOptions.SecureServing.Listener = listener o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1") - wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh) + wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, o) wardleCmd.SetArgs([]string{ "--authentication-kubeconfig", wardleToKASKubeConfigFile, "--authorization-kubeconfig", wardleToKASKubeConfigFile, diff --git a/test/integration/framework/test_server.go b/test/integration/framework/test_server.go index 353dfd3f756..e1d2c64efa9 100644 --- a/test/integration/framework/test_server.go +++ b/test/integration/framework/test_server.go @@ -176,7 +176,7 @@ func StartTestServer(ctx context.Context, t testing.TB, setup TestServerSetup) ( errCh = make(chan error) go func() { defer close(errCh) - if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().Run(ctx.Done()); err != nil { + if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil { errCh <- err } }()