Merge pull request #122148 from pohly/controllers-context-support

controllers + apiserver: enhance context support
This commit is contained in:
Kubernetes Prow Robot 2024-04-30 01:30:09 -07:00 committed by GitHub
commit d0fddf143b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 198 additions and 123 deletions

View File

@ -20,6 +20,7 @@ limitations under the License.
package app
import (
"context"
"crypto/tls"
"fmt"
"net/http"
@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`,
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
return Run(cmd.Context(), completedOptions)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`,
return nil
},
}
cmd.SetContext(genericapiserver.SetupSignalContext())
fs := cmd.Flags()
namedFlagSets := s.Flags()
@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, opts options.CompletedOptions) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
return err
}
return prepared.Run(stopCh)
return prepared.Run(ctx)
}
// CreateServerChain creates the apiservers connected via delegation.

View File

@ -32,6 +32,7 @@ import (
"os"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/spf13/pflag"
@ -56,10 +57,11 @@ import (
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"
)
func init() {
@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
tCtx := ktesting.Init(t)
if instanceOptions == nil {
instanceOptions = NewDefaultTestServerOptions()
}
@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, fmt.Errorf("failed to create temp dir: %v", err)
}
stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiserver and cleaning up
// Cancel is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
tCtx.Cancel("tearing down")
// If the apiserver was started, let's wait for it to
// shutdown clearly.
@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)
prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
} else if err := prepared.Run(stopCh); err != nil {
} else if err := prepared.Run(tCtx); err != nil {
errCh <- err
}
}(stopCh)
}()
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
if err == nil {
return &result

View File

@ -55,9 +55,6 @@ func NewController(
secondaryRange net.IPNet,
client clientset.Interface,
) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
c := &Controller{
client: client,
interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
@ -79,9 +76,6 @@ func NewController(
c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced
c.eventBroadcaster = broadcaster
c.eventRecorder = recorder
return c
}
@ -101,9 +95,12 @@ type Controller struct {
}
// Start will not return until the default ServiceCIDR exists or stopCh is closed.
func (c *Controller) Start(stopCh <-chan struct{}) {
func (c *Controller) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
stopCh := ctx.Done()
c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
return
}
// derive a context from the stopCh so we can cancel the poll loop
ctx := wait.ContextForChannel(stopCh)
// wait until first successfully sync
// this blocks apiserver startup so poll with a short interval
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {

View File

@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
)
// The default serviceCIDR must exist before the apiserver is healthy
// otherwise the allocators for Services will not work.
controller.Start(hookContext.StopCh)
controller.Start(hookContext)
return nil
})
}

View File

@ -25,8 +25,8 @@ import (
)
func main() {
stopCh := genericapiserver.SetupSignalHandler()
cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh)
ctx := genericapiserver.SetupSignalContext()
cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr)
code := cli.Run(cmd)
os.Exit(code)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"io"
"github.com/spf13/cobra"
@ -25,7 +26,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
)
func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command {
o := options.NewCustomResourceDefinitionsServerOptions(out, errOut)
cmd := &cobra.Command{
@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm
if err := o.Validate(); err != nil {
return err
}
if err := Run(o, stopCh); err != nil {
if err := Run(c.Context(), o); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)
fs := cmd.Flags()
o.AddFlags(fs)
return cmd
}
func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error {
config, err := o.Config()
if err != nil {
return err
@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct
if err != nil {
return err
}
return server.GenericAPIServer.PrepareRun().Run(stopCh)
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
}

View File

@ -18,6 +18,7 @@ package testing
import (
"context"
"errors"
"fmt"
"net"
"os"
@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
stopCh := make(chan struct{})
// TODO: this is a candidate for using what is now test/utils/ktesting,
// should that become a staging repo.
ctx, cancel := context.WithCancelCause(context.Background())
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiextensions apiserver and its
// Cancel is stopping apiextensions apiserver and its
// delegates, which itself is cleaning up after itself,
// including shutting down its storage layer.
close(stopCh)
cancel(errors.New("tearing down"))
// If the apiextensions apiserver was started, let's wait for
// it to shutdown clearly.
@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
}
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)
if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
errCh <- err
}
}(stopCh)
}()
t.Logf("Waiting for /healthz to be ok...")

View File

@ -17,6 +17,8 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@ -42,6 +44,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
}
func TestNewWithDelegate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test is done"))
delegateConfig := NewConfig(codecs)
delegateConfig.ExternalAddress = "192.168.10.4:443"
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
return nil
})
stopCh := make(chan struct{})
defer close(stopCh)
wrappingServer.PrepareRun()
wrappingServer.RunPostStartHooks(stopCh)
wrappingServer.RunPostStartHooks(ctx)
server := httptest.NewServer(wrappingServer.Handler)
defer server.Close()

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/managedfields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/admission"
@ -442,9 +443,19 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// This is the diagram of what channels/signals are dependent on each other:
//
// | stopCh
// Deprecated: use RunWithContext instead. Run will not get removed to avoid
// breaking consumers, but should not be used in new code.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
ctx := wait.ContextForChannel(stopCh)
return s.RunWithContext(ctx)
}
// RunWithContext spawns the secure http server. It only returns if ctx is canceled
// or the secure port cannot be listened on initially.
// This is the diagram of what contexts/channels/signals are dependent on each other:
//
// | ctx
// | |
// | ---------------------------------------------------------
// | | |
@ -477,12 +488,13 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// | | | |
// | |-------------------|---------------------|----------------------------------------|
// | | |
// | stopHttpServerCh (AuditBackend::Shutdown())
// | stopHttpServerCtx (AuditBackend::Shutdown())
// | |
// | listenerStoppedCh
// | |
// | HTTPServerStoppedListening (httpServerStoppedListeningCh)
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
stopCh := ctx.Done()
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
@ -544,9 +556,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
// Canceling the parent context does not immediately cancel the HTTP server.
// We only inherit context values here and deal with cancellation ourselves.
stopHTTPServerCtx, stopHTTPServer := context.WithCancelCause(context.WithoutCancel(ctx))
go func() {
defer close(stopHttpServerCh)
defer stopHTTPServer(errors.New("time to stop HTTP server"))
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
@ -565,7 +579,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
}
}
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout)
if err != nil {
return err
}
@ -694,7 +708,18 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
//
// Deprecated: use RunWithContext instead. Run will not get removed to avoid
// breaking consumers, but should not be used in new code.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
ctx := wait.ContextForChannel(stopCh)
return s.NonBlockingRunWithContext(ctx, shutdownTimeout)
}
// NonBlockingRunWithContext spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRunWithContext(ctx context.Context, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
@ -712,11 +737,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
// responsibility of the caller to close the provided channel to
// ensure cleanup.
go func() {
<-stopCh
<-ctx.Done()
close(internalStopCh)
}()
s.RunPostStartHooks(stopCh)
s.RunPostStartHooks(ctx)
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)

View File

@ -20,6 +20,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"log"
@ -41,6 +42,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
@ -200,10 +202,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}, nil)
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -222,7 +229,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}
// signal termination event: initiate a shutdown
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntilSignaled(t, signals.ShutdownInitiated)
// /readyz must return an error, but we need to give it some time
@ -423,10 +430,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}, nil)
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -445,7 +457,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}
// signal termination event: initiate a shutdown
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntilSignaled(t, signals.ShutdownInitiated)
// /readyz must return an error, but we need to give it some time
@ -568,10 +580,15 @@ func TestMuxAndDiscoveryComplete(t *testing.T) {
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -612,6 +629,9 @@ func TestPreShutdownHooks(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
s := test.server()
doer := setupDoer(t, s.SecureServingInfo)
@ -643,14 +663,16 @@ func TestPreShutdownHooks(t *testing.T) {
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return")

View File

@ -52,6 +52,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeopenapi "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec"
netutils "k8s.io/utils/net"
@ -317,17 +318,16 @@ func TestInstallAPIGroups(t *testing.T) {
}
func TestPrepareRun(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
s, config, assert := newMaster(t)
assert.NotNil(config.OpenAPIConfig)
server := httptest.NewServer(s.Handler.Director)
defer server.Close()
done := make(chan struct{})
defer close(done)
s.PrepareRun()
s.RunPostStartHooks(done)
s.RunPostStartHooks(ctx)
// openapi is installed in PrepareRun
resp, err := http.Get(server.URL + "/openapi/v2")
@ -353,9 +353,10 @@ func TestPrepareRun(t *testing.T) {
}
func TestUpdateOpenAPISpec(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
s, _, assert := newMaster(t)
s.PrepareRun()
s.RunPostStartHooks(make(chan struct{}))
s.RunPostStartHooks(ctx)
server := httptest.NewServer(s.Handler.Director)
defer server.Close()

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"net/http"
@ -48,8 +49,13 @@ type PreShutdownHookFunc func() error
type PostStartHookContext struct {
// LoopbackClientConfig is a config for a privileged loopback connection to the API server
LoopbackClientConfig *restclient.Config
// StopCh is the channel that will be closed when the server stops
// StopCh is the channel that will be closed when the server stops.
//
// Deprecated: use the PostStartHookContext itself instead, it contains a context that
// gets cancelled when the server stops. StopCh keeps getting provided for existing code.
StopCh <-chan struct{}
// Context gets cancelled when the server stops.
context.Context
}
// PostStartHookProvider is an interface in addition to provide a post start hook for the api server
@ -151,15 +157,16 @@ func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdown
}
}
// RunPostStartHooks runs the PostStartHooks for the server
func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
// RunPostStartHooks runs the PostStartHooks for the server.
func (s *GenericAPIServer) RunPostStartHooks(ctx context.Context) {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
s.postStartHooksCalled = true
context := PostStartHookContext{
LoopbackClientConfig: s.LoopbackClientConfig,
StopCh: stopCh,
StopCh: ctx.Done(),
Context: ctx,
}
for hookName, hookEntry := range s.postStartHooks {

View File

@ -18,6 +18,7 @@ package options
import (
"bytes"
"context"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/tls"
@ -25,6 +26,7 @@ import (
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"math/big"
@ -44,6 +46,7 @@ import (
"k8s.io/client-go/discovery"
restclient "k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
@ -215,6 +218,10 @@ func TestServerRunWithSNI(t *testing.T) {
test := tests[title]
t.Run(title, func(t *testing.T) {
t.Parallel()
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test has completed"))
// create server cert
certDir := "testdata/" + specToName(test.Cert)
serverCertBundleFile := filepath.Join(certDir, "cert")
@ -267,9 +274,6 @@ func TestServerRunWithSNI(t *testing.T) {
signatures[sig] = j
}
stopCh := make(chan struct{})
defer close(stopCh)
// launch server
config := setUp(t)
@ -316,7 +320,7 @@ func TestServerRunWithSNI(t *testing.T) {
preparedServer := s.PrepareRun()
preparedServerErrors := make(chan error)
go func() {
if err := preparedServer.Run(stopCh); err != nil {
if err := preparedServer.RunWithContext(ctx); err != nil {
preparedServerErrors <- err
}
}()

View File

@ -32,9 +32,9 @@ import (
)
func main() {
stopCh := genericapiserver.SetupSignalHandler()
ctx := genericapiserver.SetupSignalContext()
options := server.NewDefaultOptions(os.Stdout, os.Stderr)
cmd := server.NewCommandStartAggregator(options, stopCh)
cmd := server.NewCommandStartAggregator(ctx, options)
code := cli.Run(cmd)
os.Exit(code)
}

View File

@ -115,7 +115,7 @@ type CompletedConfig struct {
}
type runnable interface {
Run(stopCh <-chan struct{}) error
RunWithContext(ctx context.Context) error
}
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
@ -479,8 +479,8 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
func (s preparedAPIAggregator) Run(ctx context.Context) error {
return s.runnable.RunWithContext(ctx)
}
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"io"
@ -55,7 +56,7 @@ type AggregatorOptions struct {
// NewCommandStartAggregator provides a CLI handler for 'start master' command
// with a default AggregatorOptions.
func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct{}) *cobra.Command {
func NewCommandStartAggregator(ctx context.Context, defaults *AggregatorOptions) *cobra.Command {
o := *defaults
cmd := &cobra.Command{
Short: "Launch a API aggregator and proxy server",
@ -67,12 +68,13 @@ func NewCommandStartAggregator(defaults *AggregatorOptions, stopCh <-chan struct
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(stopCh); err != nil {
if err := o.RunAggregator(c.Context()); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)
o.AddFlags(cmd.Flags())
return cmd
@ -119,7 +121,7 @@ func (o *AggregatorOptions) Complete() error {
}
// RunAggregator runs the API Aggregator.
func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
func (o AggregatorOptions) RunAggregator(ctx context.Context) error {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, nil); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
@ -171,5 +173,5 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
if err != nil {
return err
}
return prepared.Run(stopCh)
return prepared.Run(ctx)
}

View File

@ -25,9 +25,9 @@ import (
)
func main() {
stopCh := genericapiserver.SetupSignalHandler()
ctx := genericapiserver.SetupSignalContext()
options := server.NewWardleServerOptions(os.Stdout, os.Stderr)
cmd := server.NewCommandStartWardleServer(options, stopCh)
cmd := server.NewCommandStartWardleServer(ctx, options)
code := cli.Run(cmd)
os.Exit(code)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"fmt"
"io"
"net"
@ -71,7 +72,7 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions {
// NewCommandStartWardleServer provides a CLI handler for 'start master' command
// with a default WardleServerOptions.
func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command {
func NewCommandStartWardleServer(ctx context.Context, defaults *WardleServerOptions) *cobra.Command {
o := *defaults
cmd := &cobra.Command{
Short: "Launch a wardle API server",
@ -83,12 +84,13 @@ func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan st
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunWardleServer(stopCh); err != nil {
if err := o.RunWardleServer(c.Context()); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)
flags := cmd.Flags()
o.RecommendedOptions.AddFlags(flags)
@ -154,7 +156,7 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) {
}
// RunWardleServer starts a new WardleServer given WardleServerOptions
func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
func (o WardleServerOptions) RunWardleServer(ctx context.Context) error {
config, err := o.Config()
if err != nil {
return err
@ -171,5 +173,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
return nil
})
return server.GenericAPIServer.PrepareRun().Run(stopCh)
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
}

View File

@ -101,7 +101,7 @@ func NewController(
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
logger.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

View File

@ -17,6 +17,8 @@ limitations under the License.
package services
import (
"context"
"errors"
"fmt"
"os"
"testing"
@ -45,19 +47,20 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
// APIServer is a server which manages apiserver.
type APIServer struct {
storageConfig storagebackend.Config
stopCh chan struct{}
cancel func(error)
}
// NewAPIServer creates an apiserver.
func NewAPIServer(storageConfig storagebackend.Config) *APIServer {
return &APIServer{
storageConfig: storageConfig,
stopCh: make(chan struct{}),
}
}
// Start starts the apiserver, returns when apiserver is ready.
func (a *APIServer) Start() error {
// The background goroutine runs until the context is canceled
// or Stop is called, whether happens first.
func (a *APIServer) Start(ctx context.Context) error {
const tokenFilePath = "known_tokens.csv"
o := options.NewServerRunOptions()
@ -93,9 +96,12 @@ func (a *APIServer) Start() error {
o.KubeletConfig.PreferredAddressTypes = []string{"InternalIP"}
ctx, cancel := context.WithCancelCause(ctx)
a.cancel = cancel
errCh := make(chan error)
go func() {
defer close(errCh)
defer cancel(errors.New("shutting down")) // Calling Stop is optional, but cancel always should be invoked.
completedOptions, err := o.Complete()
if err != nil {
errCh <- fmt.Errorf("set apiserver default options error: %w", err)
@ -106,7 +112,7 @@ func (a *APIServer) Start() error {
return
}
err = apiserver.Run(completedOptions, a.stopCh)
err = apiserver.Run(ctx, completedOptions)
if err != nil {
errCh <- fmt.Errorf("run apiserver error: %w", err)
return
@ -120,12 +126,11 @@ func (a *APIServer) Start() error {
return nil
}
// Stop stops the apiserver. Currently, there is no way to stop the apiserver.
// The function is here only for completion.
// Stop stops the apiserver. Does not block.
func (a *APIServer) Stop() error {
if a.stopCh != nil {
close(a.stopCh)
a.stopCh = nil
// nil when Start has never been called.
if a.cancel != nil {
a.cancel(errors.New("stopping API server"))
}
return nil
}

View File

@ -24,8 +24,8 @@ import (
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/klog/v2"
)
@ -58,17 +58,17 @@ func (es *e2eServices) run(t *testing.T) error {
// start starts the tests embedded services or returns an error.
func (es *e2eServices) start(t *testing.T) error {
_, ctx := ktesting.NewTestContext(t)
tCtx := ktesting.Init(t)
klog.Info("Starting e2e services...")
err := es.startEtcd(t)
if err != nil {
return err
}
err = es.startAPIServer(es.etcdStorage)
err = es.startAPIServer(tCtx, es.etcdStorage)
if err != nil {
return err
}
err = es.startNamespaceController(ctx)
err = es.startNamespaceController(tCtx)
if err != nil {
return nil
}
@ -121,10 +121,10 @@ func (es *e2eServices) startEtcd(t *testing.T) error {
}
// startAPIServer starts the embedded API server or returns an error.
func (es *e2eServices) startAPIServer(etcdStorage *storagebackend.Config) error {
func (es *e2eServices) startAPIServer(ctx context.Context, etcdStorage *storagebackend.Config) error {
klog.Info("Starting API server")
es.apiServer = NewAPIServer(*etcdStorage)
return es.apiServer.Start()
return es.apiServer.Start(ctx)
}
// startNamespaceController starts the embedded namespace controller or returns an error.

View File

@ -94,7 +94,7 @@ func TestSecretsShouldBeTransformed(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create test secret, error: %v", err)
}
test.runResource(test.logger, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace)
test.runResource(test.TContext, tt.unSealFunc, tt.transformerPrefix, "", "v1", "secrets", test.secret.Name, test.secret.Namespace)
test.cleanUp()
}
}

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
)
@ -75,7 +76,7 @@ const (
type unSealSecret func(ctx context.Context, cipherText []byte, dataCtx value.Context, config apiserverv1.ProviderConfiguration) ([]byte, error)
type transformTest struct {
logger kubeapiservertesting.Logger
ktesting.TContext
storageConfig *storagebackend.Config
configDir string
transformerConfig string
@ -85,12 +86,13 @@ type transformTest struct {
secret *corev1.Secret
}
func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) {
func newTransformTest(tb testing.TB, transformerConfigYAML string, reload bool, configDir string, storageConfig *storagebackend.Config) (*transformTest, error) {
tCtx := ktesting.Init(tb)
if storageConfig == nil {
storageConfig = framework.SharedEtcd()
}
e := transformTest{
logger: l,
TContext: tCtx,
transformerConfig: transformerConfigYAML,
storageConfig: storageConfig,
}
@ -113,7 +115,7 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin
return nil, fmt.Errorf("failed to read config file: %w", err)
}
if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(l, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil {
if e.kubeAPIServer, err = kubeapiservertesting.StartTestServer(tb, nil, e.getEncryptionOptions(reload), e.storageConfig); err != nil {
e.cleanUp()
return nil, fmt.Errorf("failed to start KubeAPI server: %w", err)
}
@ -131,11 +133,11 @@ func newTransformTest(l kubeapiservertesting.Logger, transformerConfigYAML strin
if transformerConfigYAML != "" && reload {
// when reloading is enabled, this healthz endpoint is always present
mustBeHealthy(l, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig)
mustNotHaveLivez(l, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig)
mustBeHealthy(tCtx, "/kms-providers", "ok", e.kubeAPIServer.ClientConfig)
mustNotHaveLivez(tCtx, "/kms-providers", "404 page not found", e.kubeAPIServer.ClientConfig)
// excluding healthz endpoints even if they do not exist should work
mustBeHealthy(l, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`,
mustBeHealthy(tCtx, "", `warn: some health checks cannot be excluded: no matches for "kms-provider-0","kms-provider-1","kms-provider-2","kms-provider-3"`,
e.kubeAPIServer.ClientConfig, "kms-provider-0", "kms-provider-1", "kms-provider-2", "kms-provider-3")
}
@ -530,7 +532,7 @@ func (e *transformTest) writeRawRecordToETCD(path string, data []byte) (*clientv
}
func (e *transformTest) printMetrics() error {
e.logger.Logf("Transformation Metrics:")
e.Logf("Transformation Metrics:")
metrics, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics: %s", err)
@ -538,9 +540,9 @@ func (e *transformTest) printMetrics() error {
for _, mf := range metrics {
if strings.HasPrefix(*mf.Name, metricsPrefix) {
e.logger.Logf("%s", *mf.Name)
e.Logf("%s", *mf.Name)
for _, metric := range mf.GetMetric() {
e.logger.Logf("%v", metric)
e.Logf("%v", metric)
}
}
}

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net"
// install all APIs
@ -64,6 +65,8 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
// StartRealAPIServerOrDie starts an API server that is appropriate for use in tests that require one of every resource
func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOptions)) *APIServer {
tCtx := ktesting.Init(t)
certDir, err := os.MkdirTemp("", t.Name())
if err != nil {
t.Fatal(err)
@ -148,7 +151,6 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
stopCh := make(chan struct{})
errCh := make(chan error)
go func() {
// Catch panics that occur in this go routine so we get a comprehensible failure
@ -164,7 +166,7 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
errCh <- err
return
}
if err := prepared.Run(stopCh); err != nil {
if err := prepared.Run(tCtx); err != nil {
errCh <- err
t.Error(err)
return
@ -215,9 +217,9 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
}
cleanup := func() {
// Closing stopCh is stopping apiserver and cleaning up
// Cancel stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
tCtx.Cancel("cleaning up")
// If the apiserver was started, let's wait for it to
// shutdown clearly.

View File

@ -62,9 +62,6 @@ func TestAPIServiceWaitOnStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
stopCh := make(chan struct{})
defer close(stopCh)
etcdConfig := framework.SharedEtcd()
etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
@ -235,9 +232,6 @@ func TestAggregatedAPIServer(t *testing.T) {
// makes the kube-apiserver very responsive. it's normally a minute
dynamiccertificates.FileRefreshDuration = 1 * time.Second
stopCh := make(chan struct{})
defer close(stopCh)
// we need the wardle port information first to set up the service resolver
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
if err != nil {
@ -291,7 +285,7 @@ func TestAggregatedAPIServer(t *testing.T) {
}
o.RecommendedOptions.SecureServing.Listener = listener
o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1")
wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh)
wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, o)
wardleCmd.SetArgs([]string{
"--authentication-kubeconfig", wardleToKASKubeConfigFile,
"--authorization-kubeconfig", wardleToKASKubeConfigFile,

View File

@ -176,7 +176,7 @@ func StartTestServer(ctx context.Context, t testing.TB, setup TestServerSetup) (
errCh = make(chan error)
go func() {
defer close(errCh)
if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().Run(ctx.Done()); err != nil {
if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
errCh <- err
}
}()