diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 61df9eac35a..3eae02210bb 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -73,8 +73,6 @@ const ( func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller { broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -173,6 +171,12 @@ type Controller struct { // endpoints will be handled in parallel. func (e *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + e.eventBroadcaster.StartStructuredLogging(0) + e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")}) + defer e.eventBroadcaster.Shutdown() + defer e.queue.ShutDown() klog.Infof("Starting endpoint controller") diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index f91dc4de11d..15778c97a61 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -86,8 +86,6 @@ func NewController(podInformer coreinformers.PodInformer, endpointUpdatesBatchPeriod time.Duration, ) *Controller { broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -252,6 +250,12 @@ type Controller struct { // Run will not return until stopCh is closed. func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + c.eventBroadcaster.StartLogging(klog.Infof) + c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) + defer c.eventBroadcaster.Shutdown() + defer c.queue.ShutDown() klog.Infof("Starting endpoint slice controller") diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index 11664d98614..ecefec98fba 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -75,8 +75,6 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer, endpointUpdatesBatchPeriod time.Duration, ) *Controller { broadcaster := record.NewBroadcaster() - broadcaster.StartLogging(klog.Infof) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -207,6 +205,12 @@ type Controller struct { // Run will not return until stopCh is closed. func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + c.eventBroadcaster.StartLogging(klog.Infof) + c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) + defer c.eventBroadcaster.Shutdown() + defer c.queue.ShutDown() klog.Infof("Starting EndpointSliceMirroring controller") diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go index 883db710da2..960e09855e0 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go @@ -28,9 +28,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" ) @@ -58,6 +60,11 @@ func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMi client, batchPeriod) + // The event processing pipeline is normally started via Run() method. + // However, since we don't start it in unit tests, we explicitly start it here. + esController.eventBroadcaster.StartLogging(klog.Infof) + esController.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) + esController.endpointsSynced = alwaysReady esController.endpointSlicesSynced = alwaysReady esController.servicesSynced = alwaysReady diff --git a/test/integration/apiserver/apply/apply_test.go b/test/integration/apiserver/apply/apply_test.go index acbc493ba80..20765361bd5 100644 --- a/test/integration/apiserver/apply/apply_test.go +++ b/test/integration/apiserver/apply/apply_test.go @@ -34,7 +34,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" yamlutil "k8s.io/apimachinery/pkg/util/yaml" @@ -44,28 +44,23 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" - "k8s.io/kubernetes/pkg/controlplane" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) -// TODO(wojtek-t): Migrate to use testing.TestServer instead. -func setup(t testing.TB, groupVersions ...schema.GroupVersion) (clientset.Interface, framework.CloseFunc) { - opts := framework.ControlPlaneConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()} - opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&opts) - if len(groupVersions) > 0 { - resourceConfig := controlplane.DefaultAPIResourceConfigSource() - resourceConfig.EnableVersions(groupVersions...) - controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig - } - controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) +func setup(t testing.TB) (clientset.Interface, kubeapiservertesting.TearDownFunc) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) - clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1}) + config := restclient.CopyConfig(server.ClientConfig) + // There are some tests (in scale_test.go) that rely on the response to be returned in JSON. + // So we overwrite it here. + config.ContentType = runtime.ContentTypeJSON + clientSet, err := clientset.NewForConfig(config) if err != nil { t.Fatalf("Error in create clientset: %v", err) } - return clientSet, closeFn + return clientSet, server.TearDownFn } // TestApplyAlsoCreates makes sure that PATCH requests with the apply content type @@ -2812,15 +2807,13 @@ spec: } func TestStopTrackingManagedFieldsOnFeatureDisabled(t *testing.T) { - sharedEtcd := framework.DefaultEtcdOptions() - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{ - EtcdOptions: sharedEtcd, - }) - controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() + sharedEtcd := framework.SharedEtcd() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)() - _, instanceConfig, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - client, err := clientset.NewForConfig(&restclient.Config{Host: instanceConfig.URL, QPS: -1}) + + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, sharedEtcd) + client, err := clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Error in create clientset: %v", err) } @@ -2870,14 +2863,15 @@ spec: } // Restart server with server-side apply disabled - closeFn() + server.TearDownFn() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, false)() - _, instanceConfig, closeFn = framework.RunAnAPIServer(controlPlaneConfig) - client, err = clientset.NewForConfig(&restclient.Config{Host: instanceConfig.URL, QPS: -1}) + + server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, sharedEtcd) + defer server.TearDownFn() + client, err = clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("Error in create clientset: %v", err) } - defer closeFn() _, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType). AbsPath("/apis/apps/v1"). diff --git a/test/integration/cronjob/cronjob_test.go b/test/integration/cronjob/cronjob_test.go index fc7de4849d2..855ea9c84ad 100644 --- a/test/integration/cronjob/cronjob_test.go +++ b/test/integration/cronjob/cronjob_test.go @@ -155,12 +155,12 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) { cjClient := clientSet.BatchV1().CronJobs(ns.Name) - stopCh := make(chan struct{}) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - informerSet.Start(stopCh) - go cjc.Run(context.TODO(), 1) - go jc.Run(context.TODO(), 1) + informerSet.Start(ctx.Done()) + go cjc.Run(ctx, 1) + go jc.Run(ctx, 1) _, err := cjClient.Create(context.TODO(), newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{}) if err != nil { diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 79f0dad5b22..fe1cb4e0cb0 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -51,7 +51,7 @@ import ( var zero = int64(0) -func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) { +func setup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition"}, framework.SharedEtcd()) @@ -74,22 +74,15 @@ func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsC t.Fatalf("error creating DaemonSets controller: %v", err) } - return server.TearDownFn, dc, informers, clientSet -} + ctx, cancel := context.WithCancel(context.Background()) -func setupScheduler( - ctx context.Context, - t *testing.T, - cs clientset.Interface, - informerFactory informers.SharedInformerFactory, -) { eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: cs.EventsV1(), + Interface: clientSet.EventsV1(), }) sched, err := scheduler.New( - cs, - informerFactory, + clientSet, + informers, nil, profile.NewRecorderFactory(eventBroadcaster), ctx.Done(), @@ -99,8 +92,15 @@ func setupScheduler( } eventBroadcaster.StartRecordingToSink(ctx.Done()) - go sched.Run(ctx) + + tearDownFn := func() { + cancel() + server.TearDownFn() + eventBroadcaster.Shutdown() + } + + return ctx, tearDownFn, dc, informers, clientSet } func testLabels() map[string]string { @@ -421,7 +421,7 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe func TestOneNodeDaemonLaunchesPod(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -431,12 +431,6 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - informers.Start(ctx.Done()) go dc.Run(ctx, 2) @@ -460,7 +454,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { func TestSimpleDaemonSetLaunchesPods(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -470,15 +464,9 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{}) @@ -496,7 +484,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -506,15 +494,9 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -565,7 +547,7 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -575,15 +557,9 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{}) @@ -612,7 +588,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { // not schedule Pods onto the nodes with insufficient resource. func TestInsufficientCapacityNode(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "insufficient-capacity", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -622,15 +598,9 @@ func TestInsufficientCapacityNode(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - ds := newDaemonSet("foo", ns.Name) ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m") ds.Spec.UpdateStrategy = *strategy @@ -676,7 +646,7 @@ func TestInsufficientCapacityNode(t *testing.T) { // TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a // hash collision with an existing ControllerRevision func TestLaunchWithHashCollision(t *testing.T) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -685,15 +655,9 @@ func TestLaunchWithHashCollision(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - // Create single node _, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{}) if err != nil { @@ -787,7 +751,7 @@ func TestLaunchWithHashCollision(t *testing.T) { // 2. Add a node to ensure the controller sync // 3. The dsc is expected to "PATCH" the existing pod label with new hash and deletes the old controllerrevision once finishes the update func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -796,15 +760,9 @@ func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - // Create single node _, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{}) if err != nil { @@ -915,7 +873,7 @@ func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) { // TestTaintedNode tests tainted node isn't expected to have pod scheduled func TestTaintedNode(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "tainted-node", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -925,15 +883,9 @@ func TestTaintedNode(t *testing.T) { podInformer := informers.Core().V1().Pods().Informer() nodeClient := clientset.CoreV1().Nodes() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy ds, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{}) @@ -980,7 +932,7 @@ func TestTaintedNode(t *testing.T) { // to the Unschedulable nodes. func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) { - closeFn, dc, informers, clientset := setup(t) + ctx, closeFn, dc, informers, clientset := setup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientset, "daemonset-unschedulable-test", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) @@ -990,15 +942,9 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { nodeClient := clientset.CoreV1().Nodes() podInformer := informers.Core().V1().Pods().Informer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informers.Start(ctx.Done()) go dc.Run(ctx, 2) - // Start Scheduler - setupScheduler(ctx, t, clientset, informers) - ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy ds.Spec.Template.Spec.HostNetwork = true diff --git a/test/integration/defaulttolerationseconds/defaulttolerationseconds_test.go b/test/integration/defaulttolerationseconds/defaulttolerationseconds_test.go index 84c1124fee5..39ee9adbf95 100644 --- a/test/integration/defaulttolerationseconds/defaulttolerationseconds_test.go +++ b/test/integration/defaulttolerationseconds/defaulttolerationseconds_test.go @@ -22,25 +22,23 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/apis/core/helper" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds" "k8s.io/kubernetes/test/integration/framework" ) func TestAdmission(t *testing.T) { - controlPlaneConfig := framework.NewControlPlaneConfig() - controlPlaneConfig.GenericConfig.EnableProfiling = true - controlPlaneConfig.GenericConfig.AdmissionControl = defaulttolerationseconds.NewDefaultTolerationSeconds() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(cfg *controlplane.Config) { + cfg.GenericConfig.EnableProfiling = true + cfg.GenericConfig.AdmissionControl = defaulttolerationseconds.NewDefaultTolerationSeconds() + }, + }) + defer tearDownFn() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - - ns := framework.CreateTestingNamespace("default-toleration-seconds", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(client, "default-toleration-seconds", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) pod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/integration/dualstack/dualstack_endpoints_test.go b/test/integration/dualstack/dualstack_endpoints_test.go index 970e70fc761..4d7308a5da3 100644 --- a/test/integration/dualstack/dualstack_endpoints_test.go +++ b/test/integration/dualstack/dualstack_endpoints_test.go @@ -29,42 +29,31 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/endpointslice" "k8s.io/kubernetes/test/integration/framework" - netutils "k8s.io/utils/net" ) func TestDualStackEndpoints(t *testing.T) { // Create an IPv4IPv6 dual stack control-plane serviceCIDR := "10.0.0.0/16" - secondaryServiceCIDR := "2001:db8:1::/48" + secondaryServiceCIDR := "2001:db8:1::/112" labelMap := func() map[string]string { return map[string]string{"foo": "bar"} } - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("Bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("Bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -156,8 +145,8 @@ func TestDualStackEndpoints(t *testing.T) { for i, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-dualstack-%d", i), t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-dualstack-%d", i), t) + defer framework.DeleteNamespaceOrDie(client, ns, t) // Create a pod with labels pod := &v1.Pod{ diff --git a/test/integration/dualstack/dualstack_test.go b/test/integration/dualstack/dualstack_test.go index 4ec8f72bc84..70e9a356b3e 100644 --- a/test/integration/dualstack/dualstack_test.go +++ b/test/integration/dualstack/dualstack_test.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "net" "reflect" "strings" "testing" @@ -36,9 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/test/integration/framework" netutils "k8s.io/utils/net" ) @@ -48,19 +45,15 @@ func TestCreateServiceSingleStackIPv4(t *testing.T) { // Create an IPv4 single stack control-plane serviceCIDR := "10.0.0.0/16" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = serviceCIDR + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -239,7 +232,7 @@ func TestCreateServiceSingleStackIPv4(t *testing.T) { } // create the service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if (err != nil) != tc.expectError { t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err) } @@ -262,23 +255,18 @@ func TestCreateServiceSingleStackIPv4(t *testing.T) { // TestCreateServiceDualStackIPv6 test the Service dualstackness in an IPv6 only DualStack cluster func TestCreateServiceDualStackIPv6(t *testing.T) { // Create an IPv6 only dual stack control-plane - serviceCIDR := "2001:db8:1::/48" + serviceCIDR := "2001:db8:1::/112" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - cfg.GenericConfig.PublicAddress = netutils.ParseIPSloppy("2001:db8::10") - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = serviceCIDR + opts.GenericServerRunOptions.AdvertiseAddress = netutils.ParseIPSloppy("2001:db8::10") + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -459,7 +447,7 @@ func TestCreateServiceDualStackIPv6(t *testing.T) { } // create the service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if (err != nil) != tc.expectError { t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err) } @@ -483,28 +471,17 @@ func TestCreateServiceDualStackIPv6(t *testing.T) { func TestCreateServiceDualStackIPv4IPv6(t *testing.T) { // Create an IPv4IPv6 dual stack control-plane serviceCIDR := "10.0.0.0/16" - secondaryServiceCIDR := "2001:db8:1::/48" + secondaryServiceCIDR := "2001:db8:1::/112" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -684,7 +661,7 @@ func TestCreateServiceDualStackIPv4IPv6(t *testing.T) { } // create a service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if (err != nil) != tc.expectError { t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err) } @@ -708,30 +685,19 @@ func TestCreateServiceDualStackIPv4IPv6(t *testing.T) { // TestCreateServiceDualStackIPv6IPv4 test the Service dualstackness in a IPv6IPv4 DualStack cluster func TestCreateServiceDualStackIPv6IPv4(t *testing.T) { // Create an IPv6IPv4 dual stack control-plane - serviceCIDR := "2001:db8:1::/48" + serviceCIDR := "2001:db8:1::/112" secondaryServiceCIDR := "10.0.0.0/16" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - cfg.GenericConfig.PublicAddress = netutils.ParseIPSloppy("2001:db8::10") - - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + opts.GenericServerRunOptions.AdvertiseAddress = netutils.ParseIPSloppy("2001:db8::10") + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -743,7 +709,7 @@ func TestCreateServiceDualStackIPv6IPv4(t *testing.T) { // verify client is working if err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) { - _, err = client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + _, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil { t.Logf("error fetching endpoints: %v", err) return false, nil @@ -914,7 +880,7 @@ func TestCreateServiceDualStackIPv6IPv4(t *testing.T) { } // create a service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if (err != nil) != tc.expectError { t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err) } @@ -939,28 +905,17 @@ func TestCreateServiceDualStackIPv6IPv4(t *testing.T) { func TestUpgradeDowngrade(t *testing.T) { // Create an IPv4IPv6 dual stack control-plane serviceCIDR := "10.0.0.0/16" - secondaryServiceCIDR := "2001:db8:1::/48" + secondaryServiceCIDR := "2001:db8:1::/112" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -988,7 +943,7 @@ func TestUpgradeDowngrade(t *testing.T) { } // create a service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("unexpected error while creating service:%v", err) } @@ -1053,28 +1008,17 @@ func TestUpgradeDowngrade(t *testing.T) { func TestConvertToFromExternalName(t *testing.T) { // Create an IPv4IPv6 dual stack control-plane serviceCIDR := "10.0.0.0/16" - secondaryServiceCIDR := "2001:db8:1::/48" + secondaryServiceCIDR := "2001:db8:1::/112" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -1101,7 +1045,7 @@ func TestConvertToFromExternalName(t *testing.T) { } // create a service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("unexpected error while creating service:%v", err) } @@ -1145,28 +1089,17 @@ func TestConvertToFromExternalName(t *testing.T) { func TestPreferDualStack(t *testing.T) { // Create an IPv4IPv6 dual stack control-plane serviceCIDR := "10.0.0.0/16" - secondaryServiceCIDR := "2001:db8:1::/48" + secondaryServiceCIDR := "2001:db8:1::/112" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -1197,7 +1130,7 @@ func TestPreferDualStack(t *testing.T) { } // create a service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("unexpected error while creating service:%v", err) } @@ -1231,19 +1164,15 @@ func TestServiceUpdate(t *testing.T) { // Create an IPv4 single stack control-plane serviceCIDR := "10.0.0.0/16" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = serviceCIDR + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -1270,7 +1199,7 @@ func TestServiceUpdate(t *testing.T) { } // create the service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) // if no error was expected validate the service otherwise return if err != nil { t.Errorf("unexpected error creating service:%v", err) @@ -1392,21 +1321,20 @@ func validateServiceAndClusterIPFamily(svc *v1.Service, expectedIPFamilies []v1. } func TestUpgradeServicePreferToDualStack(t *testing.T) { + sharedEtcd := framework.SharedEtcd() + // Create an IPv4 only dual stack control-plane serviceCIDR := "192.168.0.0/24" - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - _, s, closeFn := framework.RunAnAPIServer(cfg) - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.StorageConfig = *sharedEtcd + opts.ServiceClusterIPRanges = serviceCIDR + }, + }) // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -1438,7 +1366,7 @@ func TestUpgradeServicePreferToDualStack(t *testing.T) { } // create the service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1452,18 +1380,17 @@ func TestUpgradeServicePreferToDualStack(t *testing.T) { } // reconfigure the apiserver to be dual-stack - closeFn() + tearDownFn() - secondaryServiceCIDR := "2001:db8:1::/48" - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - _, s, closeFn = framework.RunAnAPIServer(cfg) - defer closeFn() + secondaryServiceCIDR := "2001:db8:1::/112" - client = clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn = framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.StorageConfig = *sharedEtcd + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + }, + }) + defer tearDownFn() // Wait until the default "kubernetes" service is created. if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { @@ -1487,25 +1414,21 @@ func TestUpgradeServicePreferToDualStack(t *testing.T) { } func TestDowngradeServicePreferToDualStack(t *testing.T) { + sharedEtcd := framework.SharedEtcd() + // Create a dual stack control-plane serviceCIDR := "192.168.0.0/24" - secondaryServiceCIDR := "2001:db8:1::/48" + secondaryServiceCIDR := "2001:db8:1::/112" + + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.StorageConfig = *sharedEtcd + opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR) + }, + }) - dualStackCfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - dualStackCfg.ExtraConfig.ServiceIPRange = *cidr - _, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - dualStackCfg.ExtraConfig.SecondaryServiceIPRange = *secCidr - _, s, closeFn := framework.RunAnAPIServer(dualStackCfg) - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) // Wait until the default "kubernetes" service is created. - if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return false, err @@ -1535,7 +1458,7 @@ func TestDowngradeServicePreferToDualStack(t *testing.T) { }, } // create the service - _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1548,14 +1471,17 @@ func TestDowngradeServicePreferToDualStack(t *testing.T) { t.Fatalf("Unexpected error validating the service %s %v", svc.Name, err) } // reconfigure the apiserver to be sinlge stack - closeFn() - // reset secondary - var emptyCidr net.IPNet - dualStackCfg.ExtraConfig.SecondaryServiceIPRange = emptyCidr + tearDownFn() + + // reset secondary + client, _, tearDownFn = framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.StorageConfig = *sharedEtcd + opts.ServiceClusterIPRanges = serviceCIDR + }, + }) + defer tearDownFn() - _, s, closeFn = framework.RunAnAPIServer(dualStackCfg) - defer closeFn() - client = clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) // Wait until the default "kubernetes" service is created. if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) @@ -1587,18 +1513,11 @@ type specMergePatch struct { // tests success when converting ClusterIP:Headless service to ExternalName func Test_ServiceChangeTypeHeadlessToExternalNameWithPatch(t *testing.T) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - _, server, closeFn := framework.RunAnAPIServer(controlPlaneConfig) - defer closeFn() + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{}) + defer tearDownFn() - config := restclient.Config{Host: server.URL} - client, err := clientset.NewForConfig(&config) - if err != nil { - t.Fatalf("Error creating clientset: %v", err) - } - - ns := framework.CreateTestingNamespace("test-service-allocate-node-ports", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(client, "test-service-allocate-node-ports", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -1611,6 +1530,7 @@ func Test_ServiceChangeTypeHeadlessToExternalNameWithPatch(t *testing.T) { }, } + var err error service, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{}) if err != nil { t.Fatalf("Error creating test service: %v", err) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 5c3da1096c6..837f0884958 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/controller-manager/pkg/informerfactory" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" @@ -586,9 +587,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(restConfig) - defer func() { - cancel() - }() + defer cancel() // Job tracking with finalizers requires less calls in Indexed mode, // so it's more likely to process all finalizers before all the pods @@ -785,9 +784,7 @@ func TestSuspendJobControllerRestart(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(restConfig) - defer func() { - cancel() - }() + defer cancel() job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -1162,24 +1159,23 @@ func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, n } func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Config, clientset.Interface, *v1.Namespace) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) - config := restclient.Config{ - Host: server.URL, - QPS: 200.0, - Burst: 200, - } - clientSet, err := clientset.NewForConfig(&config) + config := restclient.CopyConfig(server.ClientConfig) + config.QPS = 200 + config.Burst = 200 + clientSet, err := clientset.NewForConfig(config) if err != nil { t.Fatalf("Error creating clientset: %v", err) } - ns := framework.CreateTestingNamespace(nsBaseName, t) + + ns := framework.CreateNamespaceOrDie(clientSet, nsBaseName, t) closeFn := func() { - framework.DeleteTestingNamespace(ns, t) - apiServerCloseFn() + framework.DeleteNamespaceOrDie(clientSet, ns, t) + server.TearDownFn() } - return closeFn, &config, clientSet, ns + return closeFn, config, clientSet, ns } func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.Context, context.CancelFunc) { diff --git a/test/integration/scheduler/extender/extender_test.go b/test/integration/scheduler/extender/extender_test.go index 96e39cf3638..76698aa40ae 100644 --- a/test/integration/scheduler/extender/extender_test.go +++ b/test/integration/scheduler/extender/extender_test.go @@ -353,7 +353,7 @@ func TestSchedulerExtender(t *testing.T) { }, } - testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, scheduler.WithExtenders(extenders...)) + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithExtenders(extenders...)) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) defer testutils.CleanupTest(t, testCtx) diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index b2b944213ba..57c49135d2f 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -1489,7 +1489,7 @@ func TestBindPlugin(t *testing.T) { }) // Create the scheduler with the test plugin set. - testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, + testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, 0, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) testutils.SyncInformerFactory(testCtx) @@ -2421,7 +2421,7 @@ func TestActivatePods(t *testing.T) { } func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext { - testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, opts...) + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 7f96768daa6..cc68a8bc0e1 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -171,6 +171,7 @@ func TestPreemption(t *testing.T) { testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, "preemption", nil), + 0, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) testutils.SyncInformerFactory(testCtx) @@ -1436,7 +1437,7 @@ func TestPDBInPreemption(t *testing.T) { } func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { - testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), opts...) + testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), 0, opts...) testutils.SyncInformerFactory(testCtx) // wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set. f := testCtx.Scheduler.NextPod diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 880890f7b6e..8c9364deb4f 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -56,6 +56,7 @@ func TestCoreResourceEnqueue(t *testing.T) { testCtx := testutils.InitTestSchedulerWithOptions( t, testutils.InitTestAPIServer(t, "core-res-enqueue", nil), + 0, scheduler.WithPodInitialBackoffSeconds(0), scheduler.WithPodMaxBackoffSeconds(0), ) @@ -236,6 +237,7 @@ func TestCustomResourceEnqueue(t *testing.T) { testCtx = testutils.InitTestSchedulerWithOptions( t, testCtx, + 0, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), scheduler.WithPodInitialBackoffSeconds(0), diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 5e2a6112b31..c81b192d2c2 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -267,7 +267,7 @@ func TestMultipleSchedulers(t *testing.T) { }}, }, }) - testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, scheduler.WithProfiles(cfg.Profiles...)) + testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithProfiles(cfg.Profiles...)) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) diff --git a/test/integration/scheduler/scoring/priorities_test.go b/test/integration/scheduler/scoring/priorities_test.go index ec866e22b66..c757d6bbaae 100644 --- a/test/integration/scheduler/scoring/priorities_test.go +++ b/test/integration/scheduler/scoring/priorities_test.go @@ -92,6 +92,7 @@ func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *tes testCtx := testutils.InitTestSchedulerWithOptions( t, testutils.InitTestAPIServer(t, strings.ToLower(scorePluginName), nil), + 0, scheduler.WithProfiles(cfg.Profiles...), ) testutils.SyncInformerFactory(testCtx) @@ -127,6 +128,7 @@ func initTestSchedulerForNodeResourcesTest(t *testing.T) *testutils.TestContext testCtx := testutils.InitTestSchedulerWithOptions( t, testutils.InitTestAPIServer(t, strings.ToLower(noderesources.Name), nil), + 0, scheduler.WithProfiles(cfg.Profiles...), ) testutils.SyncInformerFactory(testCtx) diff --git a/test/integration/statefulset/statefulset_test.go b/test/integration/statefulset/statefulset_test.go index 34ab4140e58..2e2d99c14c3 100644 --- a/test/integration/statefulset/statefulset_test.go +++ b/test/integration/statefulset/statefulset_test.go @@ -37,6 +37,7 @@ import ( apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/statefulset" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" ) @@ -49,10 +50,7 @@ const ( // TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated func TestVolumeTemplateNoopUpdate(t *testing.T) { // Start the server with default storage setup - server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) - if err != nil { - t.Fatal(err) - } + server := apiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer server.TearDownFn() c, err := dynamic.NewForConfig(server.ClientConfig) @@ -125,8 +123,8 @@ func TestVolumeTemplateNoopUpdate(t *testing.T) { func TestSpecReplicasChange(t *testing.T) { closeFn, rm, informers, c := scSetup(t) defer closeFn() - ns := framework.CreateTestingNamespace("test-spec-replicas-change", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t) + defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(rm, informers) defer cancel() @@ -168,8 +166,8 @@ func TestSpecReplicasChange(t *testing.T) { func TestDeletingAndFailedPods(t *testing.T) { closeFn, rm, informers, c := scSetup(t) defer closeFn() - ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t) + defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(rm, informers) defer cancel() @@ -269,8 +267,8 @@ func TestStatefulSetAvailable(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.enabled)() closeFn, rm, informers, c := scSetup(t) defer closeFn() - ns := framework.CreateTestingNamespace("test-available-pods", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t) + defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(rm, informers) defer cancel() @@ -358,31 +356,28 @@ func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1 // add for issue: https://github.com/kubernetes/kubernetes/issues/108837 func TestStatefulSetStatusWithPodFail(t *testing.T) { limitedPodNumber := 2 - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = &fakePodFailAdmission{ - limitedPodNumber: limitedPodNumber, - } - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) + c, config, closeFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.AdmissionControl = &fakePodFailAdmission{ + limitedPodNumber: limitedPodNumber, + } + }, + }) defer closeFn() - config := restclient.Config{Host: s.URL} - c, err := clientset.NewForConfig(&config) - if err != nil { - t.Fatalf("Could not create clientset: %v", err) - } resyncPeriod := 12 * time.Hour - informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod) + informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod) ssc := statefulset.NewStatefulSetController( informers.Core().V1().Pods(), informers.Apps().V1().StatefulSets(), informers.Core().V1().PersistentVolumeClaims(), informers.Apps().V1().ControllerRevisions(), - clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")), + clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")), ) - ns := framework.CreateTestingNamespace("test-pod-fail", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t) + defer framework.DeleteNamespaceOrDie(c, ns, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -390,7 +385,7 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) { go ssc.Run(ctx, 5) sts := newSTS("sts", ns.Name, 4) - _, err = c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{}) + _, err := c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{}) if err != nil { t.Fatalf("Could not create statefuleSet %s: %v", sts.Name, err) } diff --git a/test/integration/statefulset/util.go b/test/integration/statefulset/util.go index 1f81c289ac4..85ef679439c 100644 --- a/test/integration/statefulset/util.go +++ b/test/integration/statefulset/util.go @@ -35,6 +35,7 @@ import ( typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/retry" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" api "k8s.io/kubernetes/pkg/apis/core" //svc "k8s.io/kubernetes/pkg/api/v1/service" @@ -159,27 +160,27 @@ func newStatefulSetPVC(name string) v1.PersistentVolumeClaim { } // scSetup sets up necessities for Statefulset integration test, including control plane, apiserver, informers, and clientset -func scSetup(t *testing.T) (framework.CloseFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) { - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - _, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig) +func scSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) - config := restclient.Config{Host: s.URL} - clientSet, err := clientset.NewForConfig(&config) + config := restclient.CopyConfig(server.ClientConfig) + clientSet, err := clientset.NewForConfig(config) if err != nil { t.Fatalf("error in create clientset: %v", err) } resyncPeriod := 12 * time.Hour - informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod) + informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod) sc := statefulset.NewStatefulSetController( informers.Core().V1().Pods(), informers.Apps().V1().StatefulSets(), informers.Core().V1().PersistentVolumeClaims(), informers.Apps().V1().ControllerRevisions(), - clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")), + clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")), ) - return closeFn, sc, informers, clientSet + return server.TearDownFn, sc, informers, clientSet } // Run STS controller and informers diff --git a/test/integration/util/util.go b/test/integration/util/util.go index eee74da29fe..27dfbd37a16 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -342,7 +342,7 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(t, framework.TestServerSetup{ ModifyServerRunOptions: func(options *options.ServerRunOptions) { - options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} + options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"} }, ModifyServerConfig: func(config *controlplane.Config) { if admission != nil { @@ -380,7 +380,7 @@ func InitTestScheduler( testCtx *TestContext, ) *TestContext { // Pod preemption is enabled by default scheduler configuration. - return InitTestSchedulerWithOptions(t, testCtx) + return InitTestSchedulerWithOptions(t, testCtx, 0) } // InitTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -388,10 +388,11 @@ func InitTestScheduler( func InitTestSchedulerWithOptions( t *testing.T, testCtx *TestContext, + resyncPeriod time.Duration, opts ...scheduler.Option, ) *TestContext { // 1. Create scheduler - testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, 0) + testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod) if testCtx.KubeConfig != nil { dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig) testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil) @@ -489,7 +490,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di // InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default // configuration. func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext { - testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), opts...) + testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), 0, opts...) SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) return testCtx @@ -512,6 +513,7 @@ func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { }) testCtx := InitTestSchedulerWithOptions( t, InitTestAPIServer(t, nsPrefix, nil), + 0, scheduler.WithProfiles(cfg.Profiles...)) SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index 8b960ce94c9..90f321b291a 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -18,125 +18,15 @@ package volumescheduling import ( "context" - "net/http/httptest" - "testing" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apiserver/pkg/admission" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/events" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controlplane" - "k8s.io/kubernetes/pkg/scheduler" - "k8s.io/kubernetes/pkg/scheduler/profile" - "k8s.io/kubernetes/test/integration/framework" ) -type testContext struct { - closeFn framework.CloseFunc - httpServer *httptest.Server - ns *v1.Namespace - clientSet *clientset.Clientset - informerFactory informers.SharedInformerFactory - scheduler *scheduler.Scheduler - - ctx context.Context - cancelFn context.CancelFunc -} - -// initTestAPIServer initializes a test environment and creates an API server with default -// configuration. Alpha resources are enabled automatically if the corresponding feature -// is enabled. -func initTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *testContext { - ctx, cancelFunc := context.WithCancel(context.Background()) - testCtx := testContext{ - ctx: ctx, - cancelFn: cancelFunc, - } - - // 1. Create API server - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - resourceConfig := controlplane.DefaultAPIResourceConfigSource() - controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig - - if admission != nil { - controlPlaneConfig.GenericConfig.AdmissionControl = admission - } - - _, testCtx.httpServer, testCtx.closeFn = framework.RunAnAPIServer(controlPlaneConfig) - s := testCtx.httpServer - - if nsPrefix != "default" { - testCtx.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), t) - } else { - testCtx.ns = framework.CreateTestingNamespace("default", t) - } - - // 2. Create kubeclient - testCtx.clientSet = clientset.NewForConfigOrDie( - &restclient.Config{ - QPS: -1, Host: s.URL, - ContentConfig: restclient.ContentConfig{ - GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}, - }, - }, - ) - return &testCtx -} - -// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default -// configuration and other options. -func initTestSchedulerWithOptions( - t *testing.T, - testCtx *testContext, - resyncPeriod time.Duration, -) *testContext { - // 1. Create scheduler - testCtx.informerFactory = informers.NewSharedInformerFactory(testCtx.clientSet, resyncPeriod) - - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: testCtx.clientSet.EventsV1(), - }) - - var err error - testCtx.scheduler, err = scheduler.New( - testCtx.clientSet, - testCtx.informerFactory, - nil, - profile.NewRecorderFactory(eventBroadcaster), - testCtx.ctx.Done()) - - if err != nil { - t.Fatalf("Couldn't create scheduler: %v", err) - } - - eventBroadcaster.StartRecordingToSink(testCtx.ctx.Done()) - - testCtx.informerFactory.Start(testCtx.scheduler.StopEverything) - testCtx.informerFactory.WaitForCacheSync(testCtx.scheduler.StopEverything) - - go testCtx.scheduler.Run(testCtx.ctx) - return testCtx -} - -// cleanupTest deletes the scheduler and the test namespace. It should be called -// at the end of a test. -func cleanupTest(t *testing.T, testCtx *testContext) { - // Kill the scheduler. - testCtx.cancelFn() - // Cleanup nodes. - testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) - framework.DeleteTestingNamespace(testCtx.ns, t) - testCtx.closeFn() -} - // waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns // an error if it does not scheduled within the given timeout. func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { diff --git a/test/integration/volumescheduling/volume_binding_test.go b/test/integration/volumescheduling/volume_binding_test.go index 99b2bd49f85..93a33a7ed12 100644 --- a/test/integration/volumescheduling/volume_binding_test.go +++ b/test/integration/volumescheduling/volume_binding_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" + testutil "k8s.io/kubernetes/test/integration/util" imageutils "k8s.io/kubernetes/test/utils/image" ) @@ -990,19 +991,16 @@ func TestCapacity(t *testing.T) { // selectedNode annotation from a claim to reschedule volume provision // on provision failure. func TestRescheduleProvisioning(t *testing.T) { - // Set feature gates - ctx, cancel := context.WithCancel(context.Background()) + testCtx := testutil.InitTestAPIServer(t, "reschedule-volume-provision", nil) - testCtx := initTestAPIServer(t, "reschedule-volume-provision", nil) - - clientset := testCtx.clientSet - ns := testCtx.ns.Name + clientset := testCtx.ClientSet + ns := testCtx.NS.Name defer func() { - cancel() + testCtx.CancelFn() deleteTestObjects(clientset, ns, metav1.DeleteOptions{}) - testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) - testCtx.closeFn() + testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) + testCtx.CloseFn() }() ctrl, informerFactory, err := initPVController(t, testCtx, 0) @@ -1038,9 +1036,9 @@ func TestRescheduleProvisioning(t *testing.T) { } // Start controller. - go ctrl.Run(ctx) - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + go ctrl.Run(testCtx.Ctx) + informerFactory.Start(testCtx.Ctx.Done()) + informerFactory.WaitForCacheSync(testCtx.Ctx.Done()) // Validate that the annotation is removed by controller for provision reschedule. if err := waitForProvisionAnn(clientset, pvc, false); err != nil { @@ -1049,18 +1047,21 @@ func TestRescheduleProvisioning(t *testing.T) { } func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig { - testCtx := initTestSchedulerWithOptions(t, initTestAPIServer(t, nsName, nil), resyncPeriod) - clientset := testCtx.clientSet - ns := testCtx.ns.Name + testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod) + testutil.SyncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + + clientset := testCtx.ClientSet + ns := testCtx.NS.Name ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds) if err != nil { t.Fatalf("Failed to create PV controller: %v", err) } - go ctrl.Run(testCtx.ctx) + go ctrl.Run(testCtx.Ctx) // Start informer factory after all controllers are configured and running. - informerFactory.Start(testCtx.ctx.Done()) - informerFactory.WaitForCacheSync(testCtx.ctx.Done()) + informerFactory.Start(testCtx.Ctx.Done()) + informerFactory.WaitForCacheSync(testCtx.Ctx.Done()) // Create shared objects // Create nodes @@ -1081,17 +1082,17 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t return &testConfig{ client: clientset, ns: ns, - stop: testCtx.ctx.Done(), + stop: testCtx.Ctx.Done(), teardown: func() { klog.Infof("test cluster %q start to tear down", ns) deleteTestObjects(clientset, ns, metav1.DeleteOptions{}) - cleanupTest(t, testCtx) + testutil.CleanupTest(t, testCtx) }, } } -func initPVController(t *testing.T, testCtx *testContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) { - clientset := testCtx.clientSet +func initPVController(t *testing.T, testCtx *testutil.TestContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) { + clientset := testCtx.ClientSet // Informers factory for controllers informerFactory := informers.NewSharedInformerFactory(clientset, 0) diff --git a/test/integration/volumescheduling/volume_capacity_priority_test.go b/test/integration/volumescheduling/volume_capacity_priority_test.go index dff08f1739e..9122b18d203 100644 --- a/test/integration/volumescheduling/volume_capacity_priority_test.go +++ b/test/integration/volumescheduling/volume_capacity_priority_test.go @@ -31,6 +31,7 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" + testutil "k8s.io/kubernetes/test/integration/util" ) var ( @@ -46,28 +47,31 @@ func mergeNodeLabels(node *v1.Node, labels map[string]string) *v1.Node { } func setupClusterForVolumeCapacityPriority(t *testing.T, nsName string, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig { - textCtx := initTestSchedulerWithOptions(t, initTestAPIServer(t, nsName, nil), resyncPeriod) - clientset := textCtx.clientSet - ns := textCtx.ns.Name + testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod) + testutil.SyncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) - ctrl, informerFactory, err := initPVController(t, textCtx, provisionDelaySeconds) + clientset := testCtx.ClientSet + ns := testCtx.NS.Name + + ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds) if err != nil { t.Fatalf("Failed to create PV controller: %v", err) } - go ctrl.Run(textCtx.ctx) + go ctrl.Run(testCtx.Ctx) // Start informer factory after all controllers are configured and running. - informerFactory.Start(textCtx.ctx.Done()) - informerFactory.WaitForCacheSync(textCtx.ctx.Done()) + informerFactory.Start(testCtx.Ctx.Done()) + informerFactory.WaitForCacheSync(testCtx.Ctx.Done()) return &testConfig{ client: clientset, ns: ns, - stop: textCtx.ctx.Done(), + stop: testCtx.Ctx.Done(), teardown: func() { klog.Infof("test cluster %q start to tear down", ns) deleteTestObjects(clientset, ns, metav1.DeleteOptions{}) - cleanupTest(t, textCtx) + testutil.CleanupTest(t, testCtx) }, } }