From 591855966c1d136c8fca299db2c6ba949bef4493 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 1 Dec 2023 09:00:59 +0100 Subject: [PATCH 1/2] sample controller: enhance context support 27a68aee3a4834 introduced context support for events. Creating an event broadcaster with context makes tests more resilient against leaking goroutines when that context gets canceled at the end of a test and enables per-test output via ktesting. The New method already had a context, therefore no API changes are needed. --- staging/src/k8s.io/sample-controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/staging/src/k8s.io/sample-controller/controller.go b/staging/src/k8s.io/sample-controller/controller.go index 59c0035a53a..3c7936aeae1 100644 --- a/staging/src/k8s.io/sample-controller/controller.go +++ b/staging/src/k8s.io/sample-controller/controller.go @@ -101,7 +101,7 @@ func NewController( utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme)) logger.V(4).Info("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) From b92273a760503cc57aba37c4d3a28554f7fec7f8 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 1 Dec 2023 09:00:59 +0100 Subject: [PATCH 2/2] apiserver + controllers: enhance context support 27a68aee3a4834 introduced context support for events. Creating an event broadcaster with context makes tests more resilient against leaking goroutines when that context gets canceled at the end of a test and enables per-test output via ktesting. The context could get passed to the constructor. A cleaner solution is to enhance context support for the apiserver and then pass the context into the controller's run method. This ripples up the call stack to all places which start an apiserver. --- cmd/kube-apiserver/app/server.go | 8 ++-- cmd/kube-apiserver/app/testing/testserver.go | 21 +++++---- .../default_servicecidr_controller.go | 13 ++---- pkg/controlplane/instance.go | 2 +- .../k8s.io/apiextensions-apiserver/main.go | 4 +- .../pkg/cmd/server/server.go | 10 +++-- .../pkg/cmd/server/testing/testserver.go | 15 ++++--- .../apiserver/pkg/server/config_test.go | 10 +++-- .../apiserver/pkg/server/genericapiserver.go | 43 ++++++++++++++---- ...ericapiserver_graceful_termination_test.go | 44 ++++++++++++++----- .../pkg/server/genericapiserver_test.go | 9 ++-- .../src/k8s.io/apiserver/pkg/server/hooks.go | 15 +++++-- .../pkg/server/options/serving_test.go | 12 +++-- staging/src/k8s.io/kube-aggregator/main.go | 4 +- .../pkg/apiserver/apiserver.go | 6 +-- .../kube-aggregator/pkg/cmd/server/start.go | 10 +++-- staging/src/k8s.io/sample-apiserver/main.go | 4 +- .../sample-apiserver/pkg/cmd/server/start.go | 10 +++-- test/e2e_node/services/apiserver.go | 23 ++++++---- test/e2e_node/services/internal_services.go | 12 ++--- .../secrets_transformation_test.go | 2 +- .../transformation/transformation_test.go | 22 +++++----- test/integration/etcd/server.go | 10 +++-- test/integration/examples/apiserver_test.go | 8 +--- test/integration/framework/test_server.go | 2 +- 25 files changed, 197 insertions(+), 122 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 231f6921cd9..deb3a6ad4da 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -20,6 +20,7 @@ limitations under the License. package app import ( + "context" "crypto/tls" "fmt" "net/http" @@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`, } // add feature enablement metrics utilfeature.DefaultMutableFeatureGate.AddMetrics() - return Run(completedOptions, genericapiserver.SetupSignalHandler()) + return Run(cmd.Context(), completedOptions) }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { @@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`, return nil }, } + cmd.SetContext(genericapiserver.SetupSignalContext()) fs := cmd.Flags() namedFlagSets := s.Flags() @@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`, } // Run runs the specified APIServer. This should never exit. -func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error { +func Run(ctx context.Context, opts options.CompletedOptions) error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) @@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error { return err } - return prepared.Run(stopCh) + return prepared.Run(ctx) } // CreateServerChain creates the apiservers connected via delegation. diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 357f01b9227..3d00006e38a 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -32,6 +32,7 @@ import ( "os" "path/filepath" "runtime" + "testing" "time" "github.com/spf13/pflag" @@ -56,10 +57,11 @@ import ( "k8s.io/klog/v2" "k8s.io/kube-aggregator/pkg/apiserver" "k8s.io/kubernetes/pkg/features" + testutil "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" - testutil "k8s.io/kubernetes/test/utils" ) func init() { @@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions { // Note: we return a tear-down func instead of a stop channel because the later will leak temporary // files that because Golang testing's call to os.Exit will not give a stop channel go routine // enough time to remove temporary files. -func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { +func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { + tCtx := ktesting.Init(t) + if instanceOptions == nil { instanceOptions = NewDefaultTestServerOptions() } @@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo return result, fmt.Errorf("failed to create temp dir: %v", err) } - stopCh := make(chan struct{}) var errCh chan error tearDown := func() { - // Closing stopCh is stopping apiserver and cleaning up + // Cancel is stopping apiserver and cleaning up // after itself, including shutting down its storage layer. - close(stopCh) + tCtx.Cancel("tearing down") // If the apiserver was started, let's wait for it to // shutdown clearly. @@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } errCh = make(chan error) - go func(stopCh <-chan struct{}) { + go func() { defer close(errCh) prepared, err := server.PrepareRun() if err != nil { errCh <- err - } else if err := prepared.Run(stopCh); err != nil { + } else if err := prepared.Run(tCtx); err != nil { errCh <- err } - }(stopCh) + }() client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) if err != nil { @@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } // StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed. -func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer { +func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer { result, err := StartTestServer(t, instanceOptions, flags, storageConfig) if err == nil { return &result diff --git a/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go b/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go index 831290d5de9..194746dc11c 100644 --- a/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go +++ b/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go @@ -55,9 +55,6 @@ func NewController( secondaryRange net.IPNet, client clientset.Interface, ) *Controller { - broadcaster := record.NewBroadcaster() - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) - c := &Controller{ client: client, interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval @@ -79,9 +76,6 @@ func NewController( c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer()) c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced - c.eventBroadcaster = broadcaster - c.eventRecorder = recorder - return c } @@ -101,9 +95,12 @@ type Controller struct { } // Start will not return until the default ServiceCIDR exists or stopCh is closed. -func (c *Controller) Start(stopCh <-chan struct{}) { +func (c *Controller) Start(ctx context.Context) { defer utilruntime.HandleCrash() + stopCh := ctx.Done() + c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx)) + c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) c.eventBroadcaster.StartStructuredLogging(0) c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) defer c.eventBroadcaster.Shutdown() @@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) { return } - // derive a context from the stopCh so we can cancel the poll loop - ctx := wait.ContextForChannel(stopCh) // wait until first successfully sync // this blocks apiserver startup so poll with a short interval err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 81ca65f494e..f74aba809b6 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) ) // The default serviceCIDR must exist before the apiserver is healthy // otherwise the allocators for Services will not work. - controller.Start(hookContext.StopCh) + controller.Start(hookContext) return nil }) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/main.go b/staging/src/k8s.io/apiextensions-apiserver/main.go index ce08f04fab9..142d5373a4b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/main.go +++ b/staging/src/k8s.io/apiextensions-apiserver/main.go @@ -25,8 +25,8 @@ import ( ) func main() { - stopCh := genericapiserver.SetupSignalHandler() - cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh) + ctx := genericapiserver.SetupSignalContext() + cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr) code := cli.Run(cmd) os.Exit(code) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go index b30b9cb65cd..0509a6aa6df 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "io" "github.com/spf13/cobra" @@ -25,7 +26,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" ) -func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command { +func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command { o := options.NewCustomResourceDefinitionsServerOptions(out, errOut) cmd := &cobra.Command{ @@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm if err := o.Validate(); err != nil { return err } - if err := Run(o, stopCh); err != nil { + if err := Run(c.Context(), o); err != nil { return err } return nil }, } + cmd.SetContext(ctx) fs := cmd.Flags() o.AddFlags(fs) return cmd } -func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error { +func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error { config, err := o.Config() if err != nil { return err @@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct if err != nil { return err } - return server.GenericAPIServer.PrepareRun().Run(stopCh) + return server.GenericAPIServer.PrepareRun().RunWithContext(ctx) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go index c27c38abc5f..3f2e7812bf4 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go @@ -18,6 +18,7 @@ package testing import ( "context" + "errors" "fmt" "net" "os" @@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions { // files that because Golang testing's call to os.Exit will not give a stop channel go routine // enough time to remove temporary files. func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) { - stopCh := make(chan struct{}) + // TODO: this is a candidate for using what is now test/utils/ktesting, + // should that become a staging repo. + ctx, cancel := context.WithCancelCause(context.Background()) var errCh chan error tearDown := func() { - // Closing stopCh is stopping apiextensions apiserver and its + // Cancel is stopping apiextensions apiserver and its // delegates, which itself is cleaning up after itself, // including shutting down its storage layer. - close(stopCh) + cancel(errors.New("tearing down")) // If the apiextensions apiserver was started, let's wait for // it to shutdown clearly. @@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin } errCh = make(chan error) - go func(stopCh <-chan struct{}) { + go func() { defer close(errCh) - if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil { + if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil { errCh <- err } - }(stopCh) + }() t.Logf("Waiting for /healthz to be ok...") diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index a1d6d8902f9..f58f3bf9c2b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "context" + "errors" "fmt" "io" "net/http" @@ -42,6 +44,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" "k8s.io/component-base/tracing" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) { } func TestNewWithDelegate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("test is done")) delegateConfig := NewConfig(codecs) delegateConfig.ExternalAddress = "192.168.10.4:443" delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4") @@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) { return nil }) - stopCh := make(chan struct{}) - defer close(stopCh) wrappingServer.PrepareRun() - wrappingServer.RunPostStartHooks(stopCh) + wrappingServer.RunPostStartHooks(ctx) server := httptest.NewServer(wrappingServer.Handler) defer server.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 6066c0fcbab..e0dcbf75849 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/managedfields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" @@ -442,9 +443,19 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. -// This is the diagram of what channels/signals are dependent on each other: // -// | stopCh +// Deprecated: use RunWithContext instead. Run will not get removed to avoid +// breaking consumers, but should not be used in new code. +func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { + ctx := wait.ContextForChannel(stopCh) + return s.RunWithContext(ctx) +} + +// RunWithContext spawns the secure http server. It only returns if ctx is canceled +// or the secure port cannot be listened on initially. +// This is the diagram of what contexts/channels/signals are dependent on each other: +// +// | ctx // | | // | --------------------------------------------------------- // | | | @@ -477,12 +488,13 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // | | | | // | |-------------------|---------------------|----------------------------------------| // | | | -// | stopHttpServerCh (AuditBackend::Shutdown()) +// | stopHttpServerCtx (AuditBackend::Shutdown()) // | | // | listenerStoppedCh // | | // | HTTPServerStoppedListening (httpServerStoppedListeningCh) -func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { +func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error { + stopCh := ctx.Done() delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated @@ -544,9 +556,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest drainedCh := s.lifecycleSignals.InFlightRequestsDrained - stopHttpServerCh := make(chan struct{}) + // Canceling the parent context does not immediately cancel the HTTP server. + // We only inherit context values here and deal with cancellation ourselves. + stopHTTPServerCtx, stopHTTPServer := context.WithCancelCause(context.WithoutCancel(ctx)) go func() { - defer close(stopHttpServerCh) + defer stopHTTPServer(errors.New("time to stop HTTP server")) timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled() if s.ShutdownSendRetryAfter { @@ -565,7 +579,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { } } - stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout) + stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout) if err != nil { return err } @@ -694,7 +708,18 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. // The returned channel is closed when the (asynchronous) termination is finished. +// +// Deprecated: use RunWithContext instead. Run will not get removed to avoid +// breaking consumers, but should not be used in new code. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { + ctx := wait.ContextForChannel(stopCh) + return s.NonBlockingRunWithContext(ctx, shutdownTimeout) +} + +// NonBlockingRunWithContext spawns the secure http server. An error is +// returned if the secure port cannot be listened on. +// The returned channel is closed when the (asynchronous) termination is finished. +func (s preparedGenericAPIServer) NonBlockingRunWithContext(ctx context.Context, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} @@ -712,11 +737,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow // responsibility of the caller to close the provided channel to // ensure cleanup. go func() { - <-stopCh + <-ctx.Done() close(internalStopCh) }() - s.RunPostStartHooks(stopCh) + s.RunPostStartHooks(ctx) if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil { klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go index 967b9760648..39079c616a6 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "io" "log" @@ -41,6 +42,7 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "github.com/google/go-cmp/cmp" "golang.org/x/net/http2" @@ -200,10 +202,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t }, nil) // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -222,7 +229,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t } // signal termination event: initiate a shutdown - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntilSignaled(t, signals.ShutdownInitiated) // /readyz must return an error, but we need to give it some time @@ -423,10 +430,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }, nil) // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -445,7 +457,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t } // signal termination event: initiate a shutdown - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntilSignaled(t, signals.ShutdownInitiated) // /readyz must return an error, but we need to give it some time @@ -568,10 +580,15 @@ func TestMuxAndDiscoveryComplete(t *testing.T) { } // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -612,6 +629,9 @@ func TestPreShutdownHooks(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) s := test.server() doer := setupDoer(t, s.SecureServingInfo) @@ -643,14 +663,16 @@ func TestPreShutdownHooks(t *testing.T) { } // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 091349182e7..3549b238c9f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -52,6 +52,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2/ktesting" kubeopenapi "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/validation/spec" netutils "k8s.io/utils/net" @@ -317,17 +318,16 @@ func TestInstallAPIGroups(t *testing.T) { } func TestPrepareRun(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) s, config, assert := newMaster(t) assert.NotNil(config.OpenAPIConfig) server := httptest.NewServer(s.Handler.Director) defer server.Close() - done := make(chan struct{}) - defer close(done) s.PrepareRun() - s.RunPostStartHooks(done) + s.RunPostStartHooks(ctx) // openapi is installed in PrepareRun resp, err := http.Get(server.URL + "/openapi/v2") @@ -353,9 +353,10 @@ func TestPrepareRun(t *testing.T) { } func TestUpdateOpenAPISpec(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) s, _, assert := newMaster(t) s.PrepareRun() - s.RunPostStartHooks(make(chan struct{})) + s.RunPostStartHooks(ctx) server := httptest.NewServer(s.Handler.Director) defer server.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index 065df6bc5cc..1561d7a8475 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "errors" "fmt" "net/http" @@ -48,8 +49,13 @@ type PreShutdownHookFunc func() error type PostStartHookContext struct { // LoopbackClientConfig is a config for a privileged loopback connection to the API server LoopbackClientConfig *restclient.Config - // StopCh is the channel that will be closed when the server stops + // StopCh is the channel that will be closed when the server stops. + // + // Deprecated: use the PostStartHookContext itself instead, it contains a context that + // gets cancelled when the server stops. StopCh keeps getting provided for existing code. StopCh <-chan struct{} + // Context gets cancelled when the server stops. + context.Context } // PostStartHookProvider is an interface in addition to provide a post start hook for the api server @@ -151,15 +157,16 @@ func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdown } } -// RunPostStartHooks runs the PostStartHooks for the server -func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { +// RunPostStartHooks runs the PostStartHooks for the server. +func (s *GenericAPIServer) RunPostStartHooks(ctx context.Context) { s.postStartHookLock.Lock() defer s.postStartHookLock.Unlock() s.postStartHooksCalled = true context := PostStartHookContext{ LoopbackClientConfig: s.LoopbackClientConfig, - StopCh: stopCh, + StopCh: ctx.Done(), + Context: ctx, } for hookName, hookEntry := range s.postStartHooks { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go index f1ca80cb2e8..1ccccb4177d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go @@ -18,6 +18,7 @@ package options import ( "bytes" + "context" cryptorand "crypto/rand" "crypto/rsa" "crypto/tls" @@ -25,6 +26,7 @@ import ( "crypto/x509/pkix" "encoding/base64" "encoding/pem" + "errors" "fmt" "io/ioutil" "math/big" @@ -44,6 +46,7 @@ import ( "k8s.io/client-go/discovery" restclient "k8s.io/client-go/rest" cliflag "k8s.io/component-base/cli/flag" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -215,6 +218,10 @@ func TestServerRunWithSNI(t *testing.T) { test := tests[title] t.Run(title, func(t *testing.T) { t.Parallel() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("test has completed")) + // create server cert certDir := "testdata/" + specToName(test.Cert) serverCertBundleFile := filepath.Join(certDir, "cert") @@ -267,9 +274,6 @@ func TestServerRunWithSNI(t *testing.T) { signatures[sig] = j } - stopCh := make(chan struct{}) - defer close(stopCh) - // launch server config := setUp(t) @@ -316,7 +320,7 @@ func TestServerRunWithSNI(t *testing.T) { preparedServer := s.PrepareRun() preparedServerErrors := make(chan error) go func() { - if err := preparedServer.Run(stopCh); err != nil { + if err := preparedServer.RunWithContext(ctx); err != nil { preparedServerErrors <- err } }() diff --git a/staging/src/k8s.io/kube-aggregator/main.go b/staging/src/k8s.io/kube-aggregator/main.go index ed8d45f6a35..863a8ea72b7 100644 --- a/staging/src/k8s.io/kube-aggregator/main.go +++ b/staging/src/k8s.io/kube-aggregator/main.go @@ -32,9 +32,9 @@ import ( ) func main() { - stopCh := genericapiserver.SetupSignalHandler() + ctx := genericapiserver.SetupSignalContext() options := server.NewDefaultOptions(os.Stdout, os.Stderr) - cmd := server.NewCommandStartAggregator(options, stopCh) + cmd := server.NewCommandStartAggregator(ctx, options) code := cli.Run(cmd) os.Exit(code) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index cedc9743538..16ab2bdfa6a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -115,7 +115,7 @@ type CompletedConfig struct { } type runnable interface { - Run(stopCh <-chan struct{}) error + RunWithContext(ctx context.Context) error } // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked. @@ -479,8 +479,8 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil } -func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error { - return s.runnable.Run(stopCh) +func (s preparedAPIAggregator) Run(ctx context.Context) error { + return s.runnable.RunWithContext(ctx) } // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index 88abac45418..d4ddf9490ae 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "errors" "fmt" "io" @@ -55,7 +56,7 @@ type AggregatorOptions struct { // NewCommandStartAggregator provides a CLI handler for 'start master' command // with a default AggregatorOptions. -func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct{}) *cobra.Command { +func NewCommandStartAggregator(ctx context.Context, defaults *AggregatorOptions) *cobra.Command { o := *defaults cmd := &cobra.Command{ Short: "Launch a API aggregator and proxy server", @@ -67,12 +68,13 @@ func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct if err := o.Validate(args); err != nil { return err } - if err := o.RunAggregator(stopCh); err != nil { + if err := o.RunAggregator(c.Context()); err != nil { return err } return nil }, } + cmd.SetContext(ctx) o.AddFlags(cmd.Flags()) return cmd @@ -119,7 +121,7 @@ func (o *AggregatorOptions) Complete() error { } // RunAggregator runs the API Aggregator. -func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { +func (o AggregatorOptions) RunAggregator(ctx context.Context) error { // TODO have a "real" external address if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, nil); err != nil { return fmt.Errorf("error creating self-signed certificates: %v", err) @@ -171,5 +173,5 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { if err != nil { return err } - return prepared.Run(stopCh) + return prepared.Run(ctx) } diff --git a/staging/src/k8s.io/sample-apiserver/main.go b/staging/src/k8s.io/sample-apiserver/main.go index 708a2a43125..7dd8718c804 100644 --- a/staging/src/k8s.io/sample-apiserver/main.go +++ b/staging/src/k8s.io/sample-apiserver/main.go @@ -25,9 +25,9 @@ import ( ) func main() { - stopCh := genericapiserver.SetupSignalHandler() + ctx := genericapiserver.SetupSignalContext() options := server.NewWardleServerOptions(os.Stdout, os.Stderr) - cmd := server.NewCommandStartWardleServer(options, stopCh) + cmd := server.NewCommandStartWardleServer(ctx, options) code := cli.Run(cmd) os.Exit(code) } diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index c39ef3e7bbf..d6310425a50 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "fmt" "io" "net" @@ -71,7 +72,7 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions { // NewCommandStartWardleServer provides a CLI handler for 'start master' command // with a default WardleServerOptions. -func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command { +func NewCommandStartWardleServer(ctx context.Context, defaults *WardleServerOptions) *cobra.Command { o := *defaults cmd := &cobra.Command{ Short: "Launch a wardle API server", @@ -83,12 +84,13 @@ func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan st if err := o.Validate(args); err != nil { return err } - if err := o.RunWardleServer(stopCh); err != nil { + if err := o.RunWardleServer(c.Context()); err != nil { return err } return nil }, } + cmd.SetContext(ctx) flags := cmd.Flags() o.RecommendedOptions.AddFlags(flags) @@ -154,7 +156,7 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) { } // RunWardleServer starts a new WardleServer given WardleServerOptions -func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error { +func (o WardleServerOptions) RunWardleServer(ctx context.Context) error { config, err := o.Config() if err != nil { return err @@ -171,5 +173,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error { return nil }) - return server.GenericAPIServer.PrepareRun().Run(stopCh) + return server.GenericAPIServer.PrepareRun().RunWithContext(ctx) } diff --git a/test/e2e_node/services/apiserver.go b/test/e2e_node/services/apiserver.go index f8fe58bcea0..4488715b3bd 100644 --- a/test/e2e_node/services/apiserver.go +++ b/test/e2e_node/services/apiserver.go @@ -17,6 +17,8 @@ limitations under the License. package services import ( + "context" + "errors" "fmt" "os" "testing" @@ -45,19 +47,20 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0 // APIServer is a server which manages apiserver. type APIServer struct { storageConfig storagebackend.Config - stopCh chan struct{} + cancel func(error) } // NewAPIServer creates an apiserver. func NewAPIServer(storageConfig storagebackend.Config) *APIServer { return &APIServer{ storageConfig: storageConfig, - stopCh: make(chan struct{}), } } // Start starts the apiserver, returns when apiserver is ready. -func (a *APIServer) Start() error { +// The background goroutine runs until the context is canceled +// or Stop is called, whether happens first. +func (a *APIServer) Start(ctx context.Context) error { const tokenFilePath = "known_tokens.csv" o := options.NewServerRunOptions() @@ -93,9 +96,12 @@ func (a *APIServer) Start() error { o.KubeletConfig.PreferredAddressTypes = []string{"InternalIP"} + ctx, cancel := context.WithCancelCause(ctx) + a.cancel = cancel errCh := make(chan error) go func() { defer close(errCh) + defer cancel(errors.New("shutting down")) // Calling Stop is optional, but cancel always should be invoked. completedOptions, err := o.Complete() if err != nil { errCh <- fmt.Errorf("set apiserver default options error: %w", err) @@ -106,7 +112,7 @@ func (a *APIServer) Start() error { return } - err = apiserver.Run(completedOptions, a.stopCh) + err = apiserver.Run(ctx, completedOptions) if err != nil { errCh <- fmt.Errorf("run apiserver error: %w", err) return @@ -120,12 +126,11 @@ func (a *APIServer) Start() error { return nil } -// Stop stops the apiserver. Currently, there is no way to stop the apiserver. -// The function is here only for completion. +// Stop stops the apiserver. Does not block. func (a *APIServer) Stop() error { - if a.stopCh != nil { - close(a.stopCh) - a.stopCh = nil + // nil when Start has never been called. + if a.cancel != nil { + a.cancel(errors.New("stopping API server")) } return nil } diff --git a/test/e2e_node/services/internal_services.go b/test/e2e_node/services/internal_services.go index 06e50ec9b2f..f2254d4ff7e 100644 --- a/test/e2e_node/services/internal_services.go +++ b/test/e2e_node/services/internal_services.go @@ -24,8 +24,8 @@ import ( etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/storagebackend" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/klog/v2" ) @@ -58,17 +58,17 @@ func (es *e2eServices) run(t *testing.T) error { // start starts the tests embedded services or returns an error. func (es *e2eServices) start(t *testing.T) error { - _, ctx := ktesting.NewTestContext(t) + tCtx := ktesting.Init(t) klog.Info("Starting e2e services...") err := es.startEtcd(t) if err != nil { return err } - err = es.startAPIServer(es.etcdStorage) + err = es.startAPIServer(tCtx, es.etcdStorage) if err != nil { return err } - err = es.startNamespaceController(ctx) + err = es.startNamespaceController(tCtx) if err != nil { return nil } @@ -121,10 +121,10 @@ func (es *e2eServices) startEtcd(t *testing.T) error { } // startAPIServer starts the embedded API server or returns an error. -func (es *e2eServices) startAPIServer(etcdStorage *storagebackend.Config) error { +func (es *e2eServices) startAPIServer(ctx context.Context, etcdStorage *storagebackend.Config) error { klog.Info("Starting API server") es.apiServer = NewAPIServer(*etcdStorage) - return es.apiServer.Start() + return es.apiServer.Start(ctx) } // startNamespaceController starts the embedded namespace controller or returns an error. diff --git a/test/integration/controlplane/transformation/secrets_transformation_test.go b/test/integration/controlplane/transformation/secrets_transformation_test.go index 53a62739ae3..d554cc50f9e 100644 --- a/test/integration/controlplane/transformation/secrets_transformation_test.go +++ b/test/integration/controlplane/transformation/secrets_transformation_test.go @@ -94,7 +94,7 @@ func TestSecretsShouldBeTransformed(t *testing.T) { if err != nil { t.Fatalf("Failed to create test secret, error: %v", err) } - test.runResource(test.logger, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace) + test.runResource(test.TContext, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace) test.cleanUp() } } diff --git a/test/integration/controlplane/transformation/transformation_test.go b/test/integration/controlplane/transformation/transformation_test.go index f4caa8e6974..fa9b74157ef 100644 --- a/test/integration/controlplane/transformation/transformation_test.go +++ b/test/integration/controlplane/transformation/transformation_test.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/pointer" "sigs.k8s.io/yaml" ) @@ -75,7 +76,7 @@ const ( type unSealSecret func(ctx context.Context, cipherText []byte, dataCtx value.Context, config apiserverv1.ProviderConfiguration) ([]byte, error) type transformTest struct { - logger kubeapiservertesting.Logger + ktesting.TContext storageConfig *storagebackend.Config configDir string transformerConfig string @@ -85,12 +86,13 @@ type transformTest struct { secret *corev1.Secret } -func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) { +func newTransformTest(tb testing.TB, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) { + tCtx := ktesting.Init(tb) if storageConfig == nil { storageConfig = framework.SharedEtcd() } e := transformTest{ - logger: l, + TContext: tCtx, transformerConfig: transformerConfigYAML, storageConfig: storageConfig, } @@ -113,7 +115,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin return nil, fmt.Errorf("failed to read config file: %w", err) } - if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil { + if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(tb, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil { e.cleanUp() return nil, fmt.Errorf("failed to start KubeAPI server: %w", err) } @@ -131,11 +133,11 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin if transformerConfigYAML != "" && reload { // when reloading is enabled, this healthz endpoint is always present - mustBeHealthy(l, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig) - mustNotHaveLivez(l, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig) + mustBeHealthy(tCtx, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig) + mustNotHaveLivez(tCtx, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig) // excluding healthz endpoints even if they do not exist should work - mustBeHealthy(l, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`, + mustBeHealthy(tCtx, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`, e.kubeAPIServer.ClientConfig, "kms-provider-0", "kms-provider-1", "kms-provider-2", "kms-provider-3") } @@ -530,7 +532,7 @@ func (e *transformTest) writeRawRecordToETCD(path string, data []byte) (*clientv } func (e *transformTest) printMetrics() error { - e.logger.Logf("Transformation Metrics:") + e.Logf("Transformation Metrics:") metrics, err := legacyregistry.DefaultGatherer.Gather() if err != nil { return fmt.Errorf("failed to gather metrics: %s", err) @@ -538,9 +540,9 @@ func (e *transformTest) printMetrics() error { for _, mf := range metrics { if strings.HasPrefix(*mf.Name, metricsPrefix) { - e.logger.Logf("%s", *mf.Name) + e.Logf("%s", *mf.Name) for _, metric := range mf.GetMetric() { - e.logger.Logf("%v", metric) + e.Logf("%v", metric) } } } diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index 2a70e79919e..ed69e3d532f 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils/ktesting" netutils "k8s.io/utils/net" // install all APIs @@ -64,6 +65,8 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0 // StartRealAPIServerOrDie starts an API server that is appropriate for use in tests that require one of every resource func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOptions)) *APIServer { + tCtx := ktesting.Init(t) + certDir, err := os.MkdirTemp("", t.Name()) if err != nil { t.Fatal(err) @@ -148,7 +151,6 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu kubeClient := clientset.NewForConfigOrDie(kubeClientConfig) - stopCh := make(chan struct{}) errCh := make(chan error) go func() { // Catch panics that occur in this go routine so we get a comprehensible failure @@ -164,7 +166,7 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu errCh <- err return } - if err := prepared.Run(stopCh); err != nil { + if err := prepared.Run(tCtx); err != nil { errCh <- err t.Error(err) return @@ -215,9 +217,9 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu } cleanup := func() { - // Closing stopCh is stopping apiserver and cleaning up + // Cancel stopping apiserver and cleaning up // after itself, including shutting down its storage layer. - close(stopCh) + tCtx.Cancel("cleaning up") // If the apiserver was started, let's wait for it to // shutdown clearly. diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 5465ac69bb2..a0c3852618b 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -62,9 +62,6 @@ func TestAPIServiceWaitOnStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) t.Cleanup(cancel) - stopCh := make(chan struct{}) - defer close(stopCh) - etcdConfig := framework.SharedEtcd() etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport) @@ -235,9 +232,6 @@ func TestAggregatedAPIServer(t *testing.T) { // makes the kube-apiserver very responsive. it's normally a minute dynamiccertificates.FileRefreshDuration = 1 * time.Second - stopCh := make(chan struct{}) - defer close(stopCh) - // we need the wardle port information first to set up the service resolver listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{}) if err != nil { @@ -291,7 +285,7 @@ func TestAggregatedAPIServer(t *testing.T) { } o.RecommendedOptions.SecureServing.Listener = listener o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1") - wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh) + wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, o) wardleCmd.SetArgs([]string{ "--authentication-kubeconfig", wardleToKASKubeConfigFile, "--authorization-kubeconfig", wardleToKASKubeConfigFile, diff --git a/test/integration/framework/test_server.go b/test/integration/framework/test_server.go index 353dfd3f756..e1d2c64efa9 100644 --- a/test/integration/framework/test_server.go +++ b/test/integration/framework/test_server.go @@ -176,7 +176,7 @@ func StartTestServer(ctx context.Context, t testing.TB, setup TestServerSetup) ( errCh = make(chan error) go func() { defer close(errCh) - if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().Run(ctx.Done()); err != nil { + if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil { errCh <- err } }()