diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go index 70a6b05d973..f98343e31d2 100644 --- a/test/integration/apiserver/watchcache_test.go +++ b/test/integration/apiserver/watchcache_test.go @@ -34,7 +34,7 @@ import ( // setup create kube-apiserver backed up by two separate etcds, // with one of them containing events and the other all other objects. -func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.CloseFunc) { +func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) { etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) if err != nil { diff --git a/test/integration/framework/controlplane_utils.go b/test/integration/framework/controlplane_utils.go index d1416342b9f..b328c88f610 100644 --- a/test/integration/framework/controlplane_utils.go +++ b/test/integration/framework/controlplane_utils.go @@ -17,48 +17,19 @@ limitations under the License. package framework import ( - "context" - "flag" - "net" - "net/http" - "net/http/httptest" "path" - "strconv" - "time" "github.com/google/uuid" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" - authauthenticator "k8s.io/apiserver/pkg/authentication/authenticator" - "k8s.io/apiserver/pkg/authentication/authenticatorfactory" - authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union" - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/apiserver/pkg/authorization/authorizerfactory" - authorizerunion "k8s.io/apiserver/pkg/authorization/union" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" - genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/options" - serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" - utilfeature "k8s.io/apiserver/pkg/util/feature" - utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" utilopenapi "k8s.io/apiserver/pkg/util/openapi" - "k8s.io/client-go/informers" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/component-base/version" - "k8s.io/klog/v2" openapicommon "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/validation/spec" "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/generated/openapi" - "k8s.io/kubernetes/pkg/kubeapiserver" - kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" - netutils "k8s.io/utils/net" ) const ( @@ -72,56 +43,6 @@ const ( // plane or choose some different minimum verbosity. var MinVerbosity = 4 -// Config is a struct of configuration directives for NewControlPlaneComponents. -type Config struct { - // If nil, a default is used, partially filled configs will not get populated. - InstanceConfig *controlplane.Config - StartReplicationManager bool - // Client throttling qps - QPS float32 - // Client burst qps, also burst replicas allowed in rc manager - Burst int - // TODO: Add configs for endpoints controller, scheduler etc -} - -// alwaysAllow always allows an action -type alwaysAllow struct{} - -func (alwaysAllow) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) { - return authorizer.DecisionAllow, "always allow", nil -} - -// unsecuredUser simulates requests to the unsecured endpoint for old tests -func unsecuredUser(req *http.Request) (*authauthenticator.Response, bool, error) { - auth := req.Header.Get("Authorization") - if len(auth) != 0 { - return nil, false, nil - } - return &authauthenticator.Response{ - User: &user.DefaultInfo{ - Name: "system:unsecured", - Groups: []string{user.SystemPrivilegedGroup, user.AllAuthenticated}, - }, - }, true, nil -} - -// APIServerReceiver can be used to provide the API server to a custom incoming server function -type APIServerReceiver interface { - SetAPIServer(m *controlplane.Instance) -} - -// APIServerHolder implements -type APIServerHolder struct { - Initialized chan struct{} - M *controlplane.Instance -} - -// SetAPIServer assigns the current API server. -func (h *APIServerHolder) SetAPIServer(m *controlplane.Instance) { - h.M = m - close(h.Initialized) -} - // DefaultOpenAPIConfig returns an openapicommon.Config initialized to default values. func DefaultOpenAPIConfig() *openapicommon.Config { openAPIConfig := genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme)) @@ -160,180 +81,6 @@ func DefaultOpenAPIV3Config() *openapicommon.Config { return openAPIConfig } -// startAPIServerOrDie starts a kubernetes API server and an httpserver to handle api requests -func startAPIServerOrDie(controlPlaneConfig *controlplane.Config, incomingServer *httptest.Server, apiServerReceiver APIServerReceiver) (*controlplane.Instance, *httptest.Server, CloseFunc) { - var m *controlplane.Instance - var s *httptest.Server - - // Ensure we log at least at the desired level - v := flag.Lookup("v").Value - level, _ := strconv.Atoi(v.String()) - if level < MinVerbosity { - v.Set(strconv.Itoa(MinVerbosity)) - } - - if incomingServer != nil { - s = incomingServer - } else { - s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - m.GenericAPIServer.Handler.ServeHTTP(w, req) - })) - } - - stopCh := make(chan struct{}) - - // the APIServer implements logic to handle the shutdown process, taking care of draining - // the connections, closing the listener socket, running the preShutdown hooks, stopping the postStartHooks, ... - // In the integration framework we don't have that logic so we try to emulate a similar shutdown process. - // Ref: staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go - closeFn := func() { - if m != nil { - m.GenericAPIServer.RunPreShutdownHooks() - } - // Signal RunPostStartHooks to finish - close(stopCh) - // Clean up APIServer resources - m.GenericAPIServer.Destroy() - // At this point the APIserver was already "destroyed", new requests will not be processed, - // however, the httptest.Server.Close() method will block if there are active connections. - // To avoid that any spurious connection keeps the test hanging, we forcefully close the - // connections before shuting down the server. There is a small window where new connections - // can be initiated but is unlikely those move to active, hanging the server shutdown. - s.CloseClientConnections() - s.Close() - } - - if controlPlaneConfig == nil { - controlPlaneConfig = NewControlPlaneConfig() - controlPlaneConfig.GenericConfig.OpenAPIConfig = DefaultOpenAPIConfig() - } - - // set the loopback client config - if controlPlaneConfig.GenericConfig.LoopbackClientConfig == nil { - controlPlaneConfig.GenericConfig.LoopbackClientConfig = &restclient.Config{QPS: 50, Burst: 100, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}} - } - controlPlaneConfig.GenericConfig.LoopbackClientConfig.Host = s.URL - - privilegedLoopbackToken := uuid.New().String() - // wrap any available authorizer - tokens := make(map[string]*user.DefaultInfo) - tokens[privilegedLoopbackToken] = &user.DefaultInfo{ - Name: user.APIServerUser, - UID: uuid.New().String(), - Groups: []string{user.SystemPrivilegedGroup, user.AllAuthenticated}, - } - tokens[UnprivilegedUserToken] = &user.DefaultInfo{ - Name: "unprivileged", - UID: uuid.New().String(), - Groups: []string{user.AllAuthenticated}, - } - - tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens, controlPlaneConfig.GenericConfig.Authentication.APIAudiences) - if controlPlaneConfig.GenericConfig.Authentication.Authenticator == nil { - controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, authauthenticator.RequestFunc(unsecuredUser)) - } else { - controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, controlPlaneConfig.GenericConfig.Authentication.Authenticator) - } - - if controlPlaneConfig.GenericConfig.Authorization.Authorizer != nil { - tokenAuthorizer := authorizerfactory.NewPrivilegedGroups(user.SystemPrivilegedGroup) - controlPlaneConfig.GenericConfig.Authorization.Authorizer = authorizerunion.New(tokenAuthorizer, controlPlaneConfig.GenericConfig.Authorization.Authorizer) - } else { - controlPlaneConfig.GenericConfig.Authorization.Authorizer = alwaysAllow{} - } - - controlPlaneConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken - - clientset, err := clientset.NewForConfig(controlPlaneConfig.GenericConfig.LoopbackClientConfig) - if err != nil { - closeFn() - klog.Fatal(err) - } - - controlPlaneConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, controlPlaneConfig.GenericConfig.LoopbackClientConfig.Timeout) - - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) { - controlPlaneConfig.GenericConfig.FlowControl = utilflowcontrol.New( - controlPlaneConfig.ExtraConfig.VersionedInformers, - clientset.FlowcontrolV1beta2(), - controlPlaneConfig.GenericConfig.MaxRequestsInFlight+controlPlaneConfig.GenericConfig.MaxMutatingRequestsInFlight, - controlPlaneConfig.GenericConfig.RequestTimeout/4, - ) - } - - if controlPlaneConfig.ExtraConfig.ServiceIPRange.IP == nil { - controlPlaneConfig.ExtraConfig.ServiceIPRange = net.IPNet{IP: netutils.ParseIPSloppy("10.0.0.0"), Mask: net.CIDRMask(24, 32)} - } - m, err = controlPlaneConfig.Complete().New(genericapiserver.NewEmptyDelegate()) - if err != nil { - // We log the error first so that even if closeFn crashes, the error is shown - klog.Errorf("error in bringing up the apiserver: %v", err) - closeFn() - klog.Fatalf("error in bringing up the apiserver: %v", err) - } - if apiServerReceiver != nil { - apiServerReceiver.SetAPIServer(m) - } - - // TODO have this start method actually use the normal start sequence for the API server - // this method never actually calls the `Run` method for the API server - // fire the post hooks ourselves - m.GenericAPIServer.PrepareRun() - m.GenericAPIServer.RunPostStartHooks(stopCh) - - cfg := *controlPlaneConfig.GenericConfig.LoopbackClientConfig - cfg.ContentConfig.GroupVersion = &schema.GroupVersion{} - privilegedClient, err := restclient.RESTClientFor(&cfg) - if err != nil { - closeFn() - klog.Fatal(err) - } - var lastHealthContent []byte - err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - result := privilegedClient.Get().AbsPath("/healthz").Do(ctx) - status := 0 - result.StatusCode(&status) - if status == 200 { - return true, nil - } - lastHealthContent, _ = result.Raw() - return false, nil - }) - if err != nil { - closeFn() - klog.Errorf("last health content: %q", string(lastHealthContent)) - klog.Fatal(err) - } - - return m, s, closeFn -} - -// NewIntegrationTestControlPlaneConfig returns the control plane config appropriate for most integration tests. -func NewIntegrationTestControlPlaneConfig() *controlplane.Config { - return NewIntegrationTestControlPlaneConfigWithOptions(&ControlPlaneConfigOptions{}) -} - -// NewIntegrationTestControlPlaneConfigWithOptions returns the control plane config appropriate for most integration tests -// configured with the provided options. -func NewIntegrationTestControlPlaneConfigWithOptions(opts *ControlPlaneConfigOptions) *controlplane.Config { - controlPlaneConfig := NewControlPlaneConfigWithOptions(opts) - controlPlaneConfig.GenericConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4") - controlPlaneConfig.ExtraConfig.APIResourceConfigSource = controlplane.DefaultAPIResourceConfigSource() - - // TODO: get rid of these tests or port them to secure serving - controlPlaneConfig.GenericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}} - - return controlPlaneConfig -} - -// ControlPlaneConfigOptions are the configurable options for a new integration test control plane config. -type ControlPlaneConfigOptions struct { - EtcdOptions *options.EtcdOptions -} - // DefaultEtcdOptions are the default EtcdOptions for use with integration tests. func DefaultEtcdOptions() *options.EtcdOptions { // This causes the integration tests to exercise the etcd @@ -344,101 +91,9 @@ func DefaultEtcdOptions() *options.EtcdOptions { return etcdOptions } -// NewControlPlaneConfig returns a basic control plane config. -func NewControlPlaneConfig() *controlplane.Config { - return NewControlPlaneConfigWithOptions(&ControlPlaneConfigOptions{}) -} - -// NewControlPlaneConfigWithOptions returns a basic control plane config configured with the provided options. -func NewControlPlaneConfigWithOptions(opts *ControlPlaneConfigOptions) *controlplane.Config { - etcdOptions := DefaultEtcdOptions() - if opts.EtcdOptions != nil { - etcdOptions = opts.EtcdOptions - } - - storageConfig := kubeapiserver.NewStorageFactoryConfig() - storageConfig.APIResourceConfig = serverstorage.NewResourceConfig() - completedStorageConfig, err := storageConfig.Complete(etcdOptions) - if err != nil { - panic(err) - } - storageFactory, err := completedStorageConfig.New() - if err != nil { - panic(err) - } - - genericConfig := genericapiserver.NewConfig(legacyscheme.Codecs) - kubeVersion := version.Get() - if len(kubeVersion.Major) == 0 { - kubeVersion.Major = "1" - } - if len(kubeVersion.Minor) == 0 { - kubeVersion.Minor = "22" - } - genericConfig.Version = &kubeVersion - genericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer() - - // TODO: get rid of these tests or port them to secure serving - genericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}} - // if using endpoint reconciler the service subnet IP family must match the Public address - genericConfig.PublicAddress = netutils.ParseIPSloppy("10.1.1.1") - err = etcdOptions.ApplyWithStorageFactoryTo(storageFactory, genericConfig) - if err != nil { - panic(err) - } - - return &controlplane.Config{ - GenericConfig: genericConfig, - ExtraConfig: controlplane.ExtraConfig{ - APIResourceConfigSource: controlplane.DefaultAPIResourceConfigSource(), - StorageFactory: storageFactory, - KubeletClientConfig: kubeletclient.KubeletClientConfig{Port: 10250}, - APIServerServicePort: 443, - MasterCount: 1, - }, - } -} - -// CloseFunc can be called to cleanup the API server -type CloseFunc func() - -// DEPRECATED: Use StartTestServer or directly StartTestServer directly -// from cmd/kube-apiserver/app/testing. -// -// RunAnAPIServer starts a API server with the provided config. -func RunAnAPIServer(controlPlaneConfig *controlplane.Config) (*controlplane.Instance, *httptest.Server, CloseFunc) { - if controlPlaneConfig == nil { - controlPlaneConfig = NewControlPlaneConfig() - controlPlaneConfig.GenericConfig.EnableProfiling = true - } - return startAPIServerOrDie(controlPlaneConfig, nil, nil) -} - -// RunAnAPIServerUsingServer starts up an instance using the provided config on the specified server. -func RunAnAPIServerUsingServer(controlPlaneConfig *controlplane.Config, s *httptest.Server, apiServerReceiver APIServerReceiver) (*controlplane.Instance, *httptest.Server, CloseFunc) { - return startAPIServerOrDie(controlPlaneConfig, s, apiServerReceiver) -} - // SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix. func SharedEtcd() *storagebackend.Config { cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New().String(), "registry"), nil) cfg.Transport.ServerList = []string{GetEtcdURL()} return cfg } - -type fakeLocalhost443Listener struct{} - -func (fakeLocalhost443Listener) Accept() (net.Conn, error) { - return nil, nil -} - -func (fakeLocalhost443Listener) Close() error { - return nil -} - -func (fakeLocalhost443Listener) Addr() net.Addr { - return &net.TCPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 443, - } -} diff --git a/test/integration/ipamperf/ipam_test.go b/test/integration/ipamperf/ipam_test.go index db964d409d8..90e931358b5 100644 --- a/test/integration/ipamperf/ipam_test.go +++ b/test/integration/ipamperf/ipam_test.go @@ -27,27 +27,26 @@ import ( "k8s.io/klog/v2" netutils "k8s.io/utils/net" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/controller/nodeipam" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" ) -func setupAllocator(apiURL string, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*clientset.Clientset, util.ShutdownFunc, error) { +func setupAllocator(kubeConfig *restclient.Config, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*clientset.Clientset, util.ShutdownFunc, error) { controllerStopChan := make(chan struct{}) shutdownFunc := func() { close(controllerStopChan) } - clientSet := clientset.NewForConfigOrDie(&restclient.Config{ - Host: apiURL, - ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, - QPS: float32(config.KubeQPS), - Burst: config.KubeQPS, - }) + clientConfig := restclient.CopyConfig(kubeConfig) + clientConfig.QPS = float32(config.KubeQPS) + clientConfig.Burst = config.KubeQPS + clientSet := clientset.NewForConfigOrDie(clientConfig) sharedInformer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour) ipamController, err := nodeipam.NewNodeIpamController( @@ -63,13 +62,18 @@ func setupAllocator(apiURL string, config *Config, clusterCIDR, serviceCIDR *net return clientSet, shutdownFunc, nil } -func runTest(t *testing.T, apiURL string, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*Results, error) { +func runTest(t *testing.T, kubeConfig *restclient.Config, config *Config, clusterCIDR, serviceCIDR *net.IPNet, subnetMaskSize int) (*Results, error) { t.Helper() klog.Infof("Running test %s", t.Name()) - defer deleteNodes(apiURL, config) // cleanup nodes on after controller shutdown + nodeClientConfig := restclient.CopyConfig(kubeConfig) + nodeClientConfig.QPS = float32(config.CreateQPS) + nodeClientConfig.Burst = config.CreateQPS + nodeClient := clientset.NewForConfigOrDie(nodeClientConfig) - clientSet, shutdownFunc, err := setupAllocator(apiURL, config, clusterCIDR, serviceCIDR, subnetMaskSize) + defer deleteNodes(nodeClient) // cleanup nodes on after controller shutdown + + clientSet, shutdownFunc, err := setupAllocator(kubeConfig, config, clusterCIDR, serviceCIDR, subnetMaskSize) if err != nil { t.Fatalf("Error starting IPAM allocator: %v", err) } @@ -80,7 +84,7 @@ func runTest(t *testing.T, apiURL string, config *Config, clusterCIDR, serviceCI t.Fatalf("Could not start test observer: %v", err) } - if err := createNodes(apiURL, config); err != nil { + if err := createNodes(nodeClient, config); err != nil { t.Fatalf("Could not create nodes: %v", err) } @@ -114,8 +118,13 @@ func TestPerformance(t *testing.T) { t.Skip("Skipping because we want to run short tests") } - apiURL, apiserverShutdown := util.StartApiserver() - defer apiserverShutdown() + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"} + }, + }) + defer tearDownFn() _, clusterCIDR, _ := netutils.ParseCIDRSloppy("10.96.0.0/11") // allows up to 8K nodes _, serviceCIDR, _ := netutils.ParseCIDRSloppy("10.94.0.0/24") // does not matter for test - pick upto 250 services @@ -149,7 +158,7 @@ func TestPerformance(t *testing.T) { t.Fatalf("Unable to create mock cloud: %v", err) } test.Cloud = cloud - if results, err := runTest(t, apiURL, test, clusterCIDR, serviceCIDR, subnetMaskSize); err == nil { + if results, err := runTest(t, kubeConfig, test, clusterCIDR, serviceCIDR, subnetMaskSize); err == nil { allResults = append(allResults, results) } }) diff --git a/test/integration/ipamperf/util.go b/test/integration/ipamperf/util.go index ad1a5ec92b8..39234d00bb3 100644 --- a/test/integration/ipamperf/util.go +++ b/test/integration/ipamperf/util.go @@ -24,9 +24,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "k8s.io/klog/v2" ) @@ -54,27 +52,15 @@ var ( } ) -func deleteNodes(apiURL string, config *Config) { +func deleteNodes(clientSet *clientset.Clientset) { klog.Info("Deleting nodes") - clientSet := clientset.NewForConfigOrDie(&restclient.Config{ - Host: apiURL, - ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, - QPS: float32(config.CreateQPS), - Burst: config.CreateQPS, - }) noGrace := int64(0) if err := clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &noGrace}, metav1.ListOptions{}); err != nil { klog.Errorf("Error deleting node: %v", err) } } -func createNodes(apiURL string, config *Config) error { - clientSet := clientset.NewForConfigOrDie(&restclient.Config{ - Host: apiURL, - ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, - QPS: float32(config.CreateQPS), - Burst: config.CreateQPS, - }) +func createNodes(clientSet *clientset.Clientset, config *Config) error { klog.Infof("Creating %d nodes", config.NumNodes) for i := 0; i < config.NumNodes; i++ { var err error diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 837f0884958..143fab92513 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1158,7 +1158,7 @@ func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, n return clientSet.BatchV1().Jobs(ns).Create(ctx, jobObj, metav1.CreateOptions{}) } -func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Config, clientset.Interface, *v1.Namespace) { +func setup(t *testing.T, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index b6f9e1ab5c2..125edfe0ab9 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -635,7 +635,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { b.Fatalf("validate scheduler config file failed: %v", err) } } - finalFunc, podInformer, client, dynClient := mustSetupScheduler(cfg) + finalFunc, podInformer, client, dynClient := mustSetupScheduler(b, cfg) b.Cleanup(finalFunc) var mu sync.Mutex diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 114c119a660..b82c7e414eb 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -26,12 +26,12 @@ import ( "os" "path" "sort" + "testing" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" coreinformers "k8s.io/client-go/informers/core/v1" @@ -41,6 +41,7 @@ import ( "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/kube-scheduler/config/v1beta2" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/test/integration/framework" @@ -74,23 +75,26 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler(config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { +func mustSetupScheduler(b *testing.B, config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 - apiURL, apiShutdown := util.StartApiserver() - var err error + + _, kubeConfig, tearDownFn := framework.StartTestServer(b, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} + }, + }) // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to // support this when there is any testcase that depends on such configuration. - cfg := &restclient.Config{ - Host: apiURL, - ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, - QPS: 5000.0, - Burst: 5000, - } + cfg := restclient.CopyConfig(kubeConfig) + cfg.QPS = 5000.0 + cfg.Burst = 5000 // use default component config if config here is nil if config == nil { + var err error config, err = newDefaultComponentConfig() if err != nil { klog.Fatalf("Error creating default component config: %v", err) @@ -105,13 +109,13 @@ func mustSetupScheduler(config *config.KubeSchedulerConfiguration) (util.Shutdow _, podInformer, schedulerShutdown := util.StartScheduler(client, cfg, config) fakePVControllerShutdown := util.StartFakePVController(client) - shutdownFunc := func() { + shutdownFn := func() { fakePVControllerShutdown() schedulerShutdown() - apiShutdown() + tearDownFn() } - return shutdownFunc, podInformer, client, dynClient + return shutdownFn, podInformer, client, dynClient } // Returns the list of scheduled pods in the specified namespaces. diff --git a/test/integration/util/util.go b/test/integration/util/util.go index d04ee18ed47..b88a856a6be 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -65,18 +65,6 @@ import ( // ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module type ShutdownFunc func() -// StartApiserver starts a local API server for testing and returns the handle to the URL and the shutdown function to stop it. -func StartApiserver() (string, ShutdownFunc) { - _, s, closeFn := framework.RunAnAPIServer(framework.NewIntegrationTestControlPlaneConfig()) - - shutdownFunc := func() { - klog.Infof("destroying API server") - closeFn() - klog.Infof("destroyed API server") - } - return s.URL, shutdownFunc -} - // StartScheduler configures and starts a scheduler given a handle to the clientSet interface // and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it. func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) {