apiserver + controllers: enhance context support

27a68aee3a introduced context support for events. Creating an event
broadcaster with context makes tests more resilient against leaking goroutines
when that context gets canceled at the end of a test and enables per-test
output via ktesting.

The context could get passed to the constructor. A cleaner solution is to
enhance context support for the apiserver and then pass the context into the
controller's run method. This ripples up the call stack to all places which
start an apiserver.
This commit is contained in:
Patrick Ohly 2023-12-01 09:00:59 +01:00
parent 591855966c
commit b92273a760
25 changed files with 197 additions and 122 deletions

View File

@ -20,6 +20,7 @@ limitations under the License.
package app
import (
"context"
"crypto/tls"
"fmt"
"net/http"
@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`,
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
return Run(cmd.Context(), completedOptions)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`,
return nil
},
}
cmd.SetContext(genericapiserver.SetupSignalContext())
fs := cmd.Flags()
namedFlagSets := s.Flags()
@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, opts options.CompletedOptions) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
return err
}
return prepared.Run(stopCh)
return prepared.Run(ctx)
}
// CreateServerChain creates the apiservers connected via delegation.

View File

@ -32,6 +32,7 @@ import (
"os"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/spf13/pflag"
@ -56,10 +57,11 @@ import (
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"
)
func init() {
@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
tCtx := ktesting.Init(t)
if instanceOptions == nil {
instanceOptions = NewDefaultTestServerOptions()
}
@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, fmt.Errorf("failed to create temp dir: %v", err)
}
stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiserver and cleaning up
// Cancel is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
tCtx.Cancel("tearing down")
// If the apiserver was started, let's wait for it to
// shutdown clearly.
@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)
prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
} else if err := prepared.Run(stopCh); err != nil {
} else if err := prepared.Run(tCtx); err != nil {
errCh <- err
}
}(stopCh)
}()
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
if err == nil {
return &result

View File

@ -55,9 +55,6 @@ func NewController(
secondaryRange net.IPNet,
client clientset.Interface,
) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
c := &Controller{
client: client,
interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
@ -79,9 +76,6 @@ func NewController(
c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced
c.eventBroadcaster = broadcaster
c.eventRecorder = recorder
return c
}
@ -101,9 +95,12 @@ type Controller struct {
}
// Start will not return until the default ServiceCIDR exists or stopCh is closed.
func (c *Controller) Start(stopCh <-chan struct{}) {
func (c *Controller) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
stopCh := ctx.Done()
c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
return
}
// derive a context from the stopCh so we can cancel the poll loop
ctx := wait.ContextForChannel(stopCh)
// wait until first successfully sync
// this blocks apiserver startup so poll with a short interval
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {

View File

@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
)
// The default serviceCIDR must exist before the apiserver is healthy
// otherwise the allocators for Services will not work.
controller.Start(hookContext.StopCh)
controller.Start(hookContext)
return nil
})
}

View File

@ -25,8 +25,8 @@ import (
)
func main() {
stopCh := genericapiserver.SetupSignalHandler()
cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh)
ctx := genericapiserver.SetupSignalContext()
cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr)
code := cli.Run(cmd)
os.Exit(code)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"io"
"github.com/spf13/cobra"
@ -25,7 +26,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
)
func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command {
o := options.NewCustomResourceDefinitionsServerOptions(out, errOut)
cmd := &cobra.Command{
@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm
if err := o.Validate(); err != nil {
return err
}
if err := Run(o, stopCh); err != nil {
if err := Run(c.Context(), o); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)
fs := cmd.Flags()
o.AddFlags(fs)
return cmd
}
func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error {
config, err := o.Config()
if err != nil {
return err
@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct
if err != nil {
return err
}
return server.GenericAPIServer.PrepareRun().Run(stopCh)
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
}

View File

@ -18,6 +18,7 @@ package testing
import (
"context"
"errors"
"fmt"
"net"
"os"
@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
stopCh := make(chan struct{})
// TODO: this is a candidate for using what is now test/utils/ktesting,
// should that become a staging repo.
ctx, cancel := context.WithCancelCause(context.Background())
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiextensions apiserver and its
// Cancel is stopping apiextensions apiserver and its
// delegates, which itself is cleaning up after itself,
// including shutting down its storage layer.
close(stopCh)
cancel(errors.New("tearing down"))
// If the apiextensions apiserver was started, let's wait for
// it to shutdown clearly.
@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
}
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)
if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
errCh <- err
}
}(stopCh)
}()
t.Logf("Waiting for /healthz to be ok...")

View File

@ -17,6 +17,8 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@ -42,6 +44,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
}
func TestNewWithDelegate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test is done"))
delegateConfig := NewConfig(codecs)
delegateConfig.ExternalAddress = "192.168.10.4:443"
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
return nil
})
stopCh := make(chan struct{})
defer close(stopCh)
wrappingServer.PrepareRun()
wrappingServer.RunPostStartHooks(stopCh)
wrappingServer.RunPostStartHooks(ctx)
server := httptest.NewServer(wrappingServer.Handler)
defer server.Close()

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/managedfields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/admission"
@ -442,9 +443,19 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// This is the diagram of what channels/signals are dependent on each other:
//
// | stopCh
// Deprecated: use RunWithContext instead. Run will not get removed to avoid
// breaking consumers, but should not be used in new code.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
ctx := wait.ContextForChannel(stopCh)
return s.RunWithContext(ctx)
}
// RunWithContext spawns the secure http server. It only returns if ctx is canceled
// or the secure port cannot be listened on initially.
// This is the diagram of what contexts/channels/signals are dependent on each other:
//
// | ctx
// | |
// | ---------------------------------------------------------
// | | |
@ -477,12 +488,13 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// | | | |
// | |-------------------|---------------------|----------------------------------------|
// | | |
// | stopHttpServerCh (AuditBackend::Shutdown())
// | stopHttpServerCtx (AuditBackend::Shutdown())
// | |
// | listenerStoppedCh
// | |
// | HTTPServerStoppedListening (httpServerStoppedListeningCh)
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
stopCh := ctx.Done()
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
@ -544,9 +556,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
// Canceling the parent context does not immediately cancel the HTTP server.
// We only inherit context values here and deal with cancellation ourselves.
stopHTTPServerCtx, stopHTTPServer := context.WithCancelCause(context.WithoutCancel(ctx))
go func() {
defer close(stopHttpServerCh)
defer stopHTTPServer(errors.New("time to stop HTTP server"))
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
@ -565,7 +579,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
}
}
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout)
if err != nil {
return err
}
@ -694,7 +708,18 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
//
// Deprecated: use RunWithContext instead. Run will not get removed to avoid
// breaking consumers, but should not be used in new code.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
ctx := wait.ContextForChannel(stopCh)
return s.NonBlockingRunWithContext(ctx, shutdownTimeout)
}
// NonBlockingRunWithContext spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRunWithContext(ctx context.Context, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
@ -712,11 +737,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
// responsibility of the caller to close the provided channel to
// ensure cleanup.
go func() {
<-stopCh
<-ctx.Done()
close(internalStopCh)
}()
s.RunPostStartHooks(stopCh)
s.RunPostStartHooks(ctx)
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)

View File

@ -20,6 +20,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"log"
@ -41,6 +42,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
@ -200,10 +202,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}, nil)
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -222,7 +229,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}
// signal termination event: initiate a shutdown
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntilSignaled(t, signals.ShutdownInitiated)
// /readyz must return an error, but we need to give it some time
@ -423,10 +430,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}, nil)
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -445,7 +457,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}
// signal termination event: initiate a shutdown
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntilSignaled(t, signals.ShutdownInitiated)
// /readyz must return an error, but we need to give it some time
@ -568,10 +580,15 @@ func TestMuxAndDiscoveryComplete(t *testing.T) {
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -612,6 +629,9 @@ func TestPreShutdownHooks(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
s := test.server()
doer := setupDoer(t, s.SecureServingInfo)
@ -643,14 +663,16 @@ func TestPreShutdownHooks(t *testing.T) {
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return")

View File

@ -52,6 +52,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeopenapi "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec"
netutils "k8s.io/utils/net"
@ -317,17 +318,16 @@ func TestInstallAPIGroups(t *testing.T) {
}
func TestPrepareRun(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
s, config, assert := newMaster(t)
assert.NotNil(config.OpenAPIConfig)
server := httptest.NewServer(s.Handler.Director)
defer server.Close()
done := make(chan struct{})
defer close(done)
s.PrepareRun()
s.RunPostStartHooks(done)
s.RunPostStartHooks(ctx)
// openapi is installed in PrepareRun
resp, err := http.Get(server.URL + "/openapi/v2")
@ -353,9 +353,10 @@ func TestPrepareRun(t *testing.T) {
}
func TestUpdateOpenAPISpec(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
s, _, assert := newMaster(t)
s.PrepareRun()
s.RunPostStartHooks(make(chan struct{}))
s.RunPostStartHooks(ctx)
server := httptest.NewServer(s.Handler.Director)
defer server.Close()

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"net/http"
@ -48,8 +49,13 @@ type PreShutdownHookFunc func() error
type PostStartHookContext struct {
// LoopbackClientConfig is a config for a privileged loopback connection to the API server
LoopbackClientConfig *restclient.Config
// StopCh is the channel that will be closed when the server stops
// StopCh is the channel that will be closed when the server stops.
//
// Deprecated: use the PostStartHookContext itself instead, it contains a context that
// gets cancelled when the server stops. StopCh keeps getting provided for existing code.
StopCh <-chan struct{}
// Context gets cancelled when the server stops.
context.Context
}
// PostStartHookProvider is an interface in addition to provide a post start hook for the api server
@ -151,15 +157,16 @@ func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdown
}
}
// RunPostStartHooks runs the PostStartHooks for the server
func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
// RunPostStartHooks runs the PostStartHooks for the server.
func (s *GenericAPIServer) RunPostStartHooks(ctx context.Context) {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
s.postStartHooksCalled = true
context := PostStartHookContext{
LoopbackClientConfig: s.LoopbackClientConfig,
StopCh: stopCh,
StopCh: ctx.Done(),
Context: ctx,
}
for hookName, hookEntry := range s.postStartHooks {

View File

@ -18,6 +18,7 @@ package options
import (
"bytes"
"context"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/tls"
@ -25,6 +26,7 @@ import (
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"math/big"
@ -44,6 +46,7 @@ import (
"k8s.io/client-go/discovery"
restclient "k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
@ -215,6 +218,10 @@ func TestServerRunWithSNI(t *testing.T) {
test := tests[title]
t.Run(title, func(t *testing.T) {
t.Parallel()
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test has completed"))
// create server cert
certDir := "testdata/" + specToName(test.Cert)
serverCertBundleFile := filepath.Join(certDir, "cert")
@ -267,9 +274,6 @@ func TestServerRunWithSNI(t *testing.T) {
signatures[sig] = j
}
stopCh := make(chan struct{})
defer close(stopCh)
// launch server
config := setUp(t)
@ -316,7 +320,7 @@ func TestServerRunWithSNI(t *testing.T) {
preparedServer := s.PrepareRun()
preparedServerErrors := make(chan error)
go func() {
if err := preparedServer.Run(stopCh); err != nil {
if err := preparedServer.RunWithContext(ctx); err != nil {
preparedServerErrors <- err
}
}()

View File

@ -32,9 +32,9 @@ import (
)
func main() {
stopCh := genericapiserver.SetupSignalHandler()
ctx := genericapiserver.SetupSignalContext()
options := server.NewDefaultOptions(os.Stdout, os.Stderr)
cmd := server.NewCommandStartAggregator(options, stopCh)
cmd := server.NewCommandStartAggregator(ctx, options)
code := cli.Run(cmd)
os.Exit(code)
}

View File

@ -115,7 +115,7 @@ type CompletedConfig struct {
}
type runnable interface {
Run(stopCh <-chan struct{}) error
RunWithContext(ctx context.Context) error
}
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
@ -479,8 +479,8 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
func (s preparedAPIAggregator) Run(ctx context.Context) error {
return s.runnable.RunWithContext(ctx)
}
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"io"
@ -55,7 +56,7 @@ type AggregatorOptions struct {
// NewCommandStartAggregator provides a CLI handler for 'start master' command
// with a default AggregatorOptions.
func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct{}) *cobra.Command {
func NewCommandStartAggregator(ctx context.Context, defaults *AggregatorOptions) *cobra.Command {
o := *defaults
cmd := &cobra.Command{
Short: "Launch a API aggregator and proxy server",
@ -67,12 +68,13 @@ func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(stopCh); err != nil {
if err := o.RunAggregator(c.Context()); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)
o.AddFlags(cmd.Flags())
return cmd
@ -119,7 +121,7 @@ func (o *AggregatorOptions) Complete() error {
}
// RunAggregator runs the API Aggregator.
func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
func (o AggregatorOptions) RunAggregator(ctx context.Context) error {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, nil); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
@ -171,5 +173,5 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
if err != nil {
return err
}
return prepared.Run(stopCh)
return prepared.Run(ctx)
}

View File

@ -25,9 +25,9 @@ import (
)
func main() {
stopCh := genericapiserver.SetupSignalHandler()
ctx := genericapiserver.SetupSignalContext()
options := server.NewWardleServerOptions(os.Stdout, os.Stderr)
cmd := server.NewCommandStartWardleServer(options, stopCh)
cmd := server.NewCommandStartWardleServer(ctx, options)
code := cli.Run(cmd)
os.Exit(code)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"fmt"
"io"
"net"
@ -71,7 +72,7 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions {
// NewCommandStartWardleServer provides a CLI handler for 'start master' command
// with a default WardleServerOptions.
func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command {
func NewCommandStartWardleServer(ctx context.Context, defaults *WardleServerOptions) *cobra.Command {
o := *defaults
cmd := &cobra.Command{
Short: "Launch a wardle API server",
@ -83,12 +84,13 @@ func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan st
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunWardleServer(stopCh); err != nil {
if err := o.RunWardleServer(c.Context()); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)
flags := cmd.Flags()
o.RecommendedOptions.AddFlags(flags)
@ -154,7 +156,7 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) {
}
// RunWardleServer starts a new WardleServer given WardleServerOptions
func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
func (o WardleServerOptions) RunWardleServer(ctx context.Context) error {
config, err := o.Config()
if err != nil {
return err
@ -171,5 +173,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
return nil
})
return server.GenericAPIServer.PrepareRun().Run(stopCh)
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package services
import (
"context"
"errors"
"fmt"
"os"
"testing"
@ -45,19 +47,20 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
// APIServer is a server which manages apiserver.
type APIServer struct {
storageConfig storagebackend.Config
stopCh chan struct{}
cancel func(error)
}
// NewAPIServer creates an apiserver.
func NewAPIServer(storageConfig storagebackend.Config) *APIServer {
return &APIServer{
storageConfig: storageConfig,
stopCh: make(chan struct{}),
}
}
// Start starts the apiserver, returns when apiserver is ready.
func (a *APIServer) Start() error {
// The background goroutine runs until the context is canceled
// or Stop is called, whether happens first.
func (a *APIServer) Start(ctx context.Context) error {
const tokenFilePath = "known_tokens.csv"
o := options.NewServerRunOptions()
@ -93,9 +96,12 @@ func (a *APIServer) Start() error {
o.KubeletConfig.PreferredAddressTypes = []string{"InternalIP"}
ctx, cancel := context.WithCancelCause(ctx)
a.cancel = cancel
errCh := make(chan error)
go func() {
defer close(errCh)
defer cancel(errors.New("shutting down")) // Calling Stop is optional, but cancel always should be invoked.
completedOptions, err := o.Complete()
if err != nil {
errCh <- fmt.Errorf("set apiserver default options error: %w", err)
@ -106,7 +112,7 @@ func (a *APIServer) Start() error {
return
}
err = apiserver.Run(completedOptions, a.stopCh)
err = apiserver.Run(ctx, completedOptions)
if err != nil {
errCh <- fmt.Errorf("run apiserver error: %w", err)
return
@ -120,12 +126,11 @@ func (a *APIServer) Start() error {
return nil
}
// Stop stops the apiserver. Currently, there is no way to stop the apiserver.
// The function is here only for completion.
// Stop stops the apiserver. Does not block.
func (a *APIServer) Stop() error {
if a.stopCh != nil {
close(a.stopCh)
a.stopCh = nil
// nil when Start has never been called.
if a.cancel != nil {
a.cancel(errors.New("stopping API server"))
}
return nil
}

View File

@ -24,8 +24,8 @@ import (
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/klog/v2"
)
@ -58,17 +58,17 @@ func (es *e2eServices) run(t *testing.T) error {
// start starts the tests embedded services or returns an error.
func (es *e2eServices) start(t *testing.T) error {
_, ctx := ktesting.NewTestContext(t)
tCtx := ktesting.Init(t)
klog.Info("Starting e2e services...")
err := es.startEtcd(t)
if err != nil {
return err
}
err = es.startAPIServer(es.etcdStorage)
err = es.startAPIServer(tCtx, es.etcdStorage)
if err != nil {
return err
}
err = es.startNamespaceController(ctx)
err = es.startNamespaceController(tCtx)
if err != nil {
return nil
}
@ -121,10 +121,10 @@ func (es *e2eServices) startEtcd(t *testing.T) error {
}
// startAPIServer starts the embedded API server or returns an error.
func (es *e2eServices) startAPIServer(etcdStorage *storagebackend.Config) error {
func (es *e2eServices) startAPIServer(ctx context.Context, etcdStorage *storagebackend.Config) error {
klog.Info("Starting API server")
es.apiServer = NewAPIServer(*etcdStorage)
return es.apiServer.Start()
return es.apiServer.Start(ctx)
}
// startNamespaceController starts the embedded namespace controller or returns an error.

View File

@ -94,7 +94,7 @@ func TestSecretsShouldBeTransformed(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create test secret, error: %v", err)
}
test.runResource(test.logger, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace)
test.runResource(test.TContext, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace)
test.cleanUp()
}
}

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
)
@ -75,7 +76,7 @@ const (
type unSealSecret func(ctx context.Context, cipherText []byte, dataCtx value.Context, config apiserverv1.ProviderConfiguration) ([]byte, error)
type transformTest struct {
logger kubeapiservertesting.Logger
ktesting.TContext
storageConfig *storagebackend.Config
configDir string
transformerConfig string
@ -85,12 +86,13 @@ type transformTest struct {
secret *corev1.Secret
}
func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) {
func newTransformTest(tb testing.TB, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) {
tCtx := ktesting.Init(tb)
if storageConfig == nil {
storageConfig = framework.SharedEtcd()
}
e := transformTest{
logger: l,
TContext: tCtx,
transformerConfig: transformerConfigYAML,
storageConfig: storageConfig,
}
@ -113,7 +115,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin
return nil, fmt.Errorf("failed to read config file: %w", err)
}
if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil {
if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(tb, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil {
e.cleanUp()
return nil, fmt.Errorf("failed to start KubeAPI server: %w", err)
}
@ -131,11 +133,11 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin
if transformerConfigYAML != "" && reload {
// when reloading is enabled, this healthz endpoint is always present
mustBeHealthy(l, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig)
mustNotHaveLivez(l, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig)
mustBeHealthy(tCtx, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig)
mustNotHaveLivez(tCtx, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig)
// excluding healthz endpoints even if they do not exist should work
mustBeHealthy(l, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`,
mustBeHealthy(tCtx, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`,
e.kubeAPIServer.ClientConfig, "kms-provider-0", "kms-provider-1", "kms-provider-2", "kms-provider-3")
}
@ -530,7 +532,7 @@ func (e *transformTest) writeRawRecordToETCD(path string, data []byte) (*clientv
}
func (e *transformTest) printMetrics() error {
e.logger.Logf("Transformation Metrics:")
e.Logf("Transformation Metrics:")
metrics, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics: %s", err)
@ -538,9 +540,9 @@ func (e *transformTest) printMetrics() error {
for _, mf := range metrics {
if strings.HasPrefix(*mf.Name, metricsPrefix) {
e.logger.Logf("%s", *mf.Name)
e.Logf("%s", *mf.Name)
for _, metric := range mf.GetMetric() {
e.logger.Logf("%v", metric)
e.Logf("%v", metric)
}
}
}

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net"
// install all APIs
@ -64,6 +65,8 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
// StartRealAPIServerOrDie starts an API server that is appropriate for use in tests that require one of every resource
func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOptions)) *APIServer {
tCtx := ktesting.Init(t)
certDir, err := os.MkdirTemp("", t.Name())
if err != nil {
t.Fatal(err)
@ -148,7 +151,6 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
stopCh := make(chan struct{})
errCh := make(chan error)
go func() {
// Catch panics that occur in this go routine so we get a comprehensible failure
@ -164,7 +166,7 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
errCh <- err
return
}
if err := prepared.Run(stopCh); err != nil {
if err := prepared.Run(tCtx); err != nil {
errCh <- err
t.Error(err)
return
@ -215,9 +217,9 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
}
cleanup := func() {
// Closing stopCh is stopping apiserver and cleaning up
// Cancel stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
tCtx.Cancel("cleaning up")
// If the apiserver was started, let's wait for it to
// shutdown clearly.

View File

@ -62,9 +62,6 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
stopCh := make(chan struct{})
defer close(stopCh)
etcdConfig := framework.SharedEtcd()
etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
@ -235,9 +232,6 @@ func TestAggregatedAPIServer(t *testing.T) {
// makes the kube-apiserver very responsive. it's normally a minute
dynamiccertificates.FileRefreshDuration = 1 * time.Second
stopCh := make(chan struct{})
defer close(stopCh)
// we need the wardle port information first to set up the service resolver
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
if err != nil {
@ -291,7 +285,7 @@ func TestAggregatedAPIServer(t *testing.T) {
}
o.RecommendedOptions.SecureServing.Listener = listener
o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1")
wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh)
wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, o)
wardleCmd.SetArgs([]string{
"--authentication-kubeconfig", wardleToKASKubeConfigFile,
"--authorization-kubeconfig", wardleToKASKubeConfigFile,

View File

@ -176,7 +176,7 @@ func StartTestServer(ctx context.Context, t testing.TB, setup TestServerSetup) (
errCh = make(chan error)
go func() {
defer close(errCh)
if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().Run(ctx.Done()); err != nil {
if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
errCh <- err
}
}()