From 783da34f543c0092fe5768e1bcfee4ae378d0500 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 30 May 2022 20:30:58 +0200 Subject: [PATCH 1/6] Clean shutdown of disruption integration tests --- pkg/controller/disruption/disruption.go | 17 +++++--- .../integration/disruption/disruption_test.go | 41 +++++++++++-------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 3c8fcbab770..018157727b7 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -358,7 +358,18 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, func (dc *DisruptionController) Run(ctx context.Context) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + if dc.kubeClient != nil { + klog.Infof("Sending events to api server.") + dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")}) + } else { + klog.Infof("No api server defined - no events will be sent to API server.") + } + defer dc.broadcaster.Shutdown() + defer dc.queue.ShutDown() + defer dc.recheckQueue.ShutDown() klog.Infof("Starting disruption controller") defer klog.Infof("Shutting down disruption controller") @@ -367,12 +378,6 @@ func (dc *DisruptionController) Run(ctx context.Context) { return } - if dc.kubeClient != nil { - klog.Infof("Sending events to api server.") - dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")}) - } else { - klog.Infof("No api server defined - no events will be sent to API server.") - } go wait.UntilWithContext(ctx, dc.worker, time.Second) go wait.Until(dc.recheckWorker, time.Second, ctx.Done()) diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index ca7fc8b31e0..a810d45d5e6 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -101,7 +101,9 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti func TestPDBWithScaleSubresource(t *testing.T) { s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t) defer s.TearDownFn() - ctx := context.TODO() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() nsName := "pdb-scale-subresource" createNs(ctx, t, nsName, clientSet) @@ -187,16 +189,14 @@ func TestPDBWithScaleSubresource(t *testing.T) { } func TestEmptySelector(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() testcases := []struct { name string - createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error + createPDBFunc func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error expectedCurrentHealthy int32 }{ { name: "v1beta1 should not target any pods", - createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { + createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { pdb := &v1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -213,7 +213,7 @@ func TestEmptySelector(t *testing.T) { }, { name: "v1 should target all pods", - createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { + createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -235,6 +235,9 @@ func TestEmptySelector(t *testing.T) { s, pdbc, informers, clientSet, _, _ := setup(t) defer s.TearDownFn() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nsName := fmt.Sprintf("pdb-empty-selector-%d", i) createNs(ctx, t, nsName, clientSet) @@ -252,7 +255,7 @@ func TestEmptySelector(t *testing.T) { waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning) pdbName := "test-pdb" - if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil { + if err := tc.createPDBFunc(ctx, clientSet, pdbName, nsName, minAvailable); err != nil { t.Errorf("Error creating PodDisruptionBudget: %v", err) } @@ -271,16 +274,14 @@ func TestEmptySelector(t *testing.T) { } func TestSelectorsForPodsWithoutLabels(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() testcases := []struct { name string - createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error + createPDBFunc func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error expectedCurrentHealthy int32 }{ { name: "pods with no labels can be targeted by v1 PDBs with empty selector", - createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { + createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -297,7 +298,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { }, { name: "pods with no labels can be targeted by v1 PDBs with DoesNotExist selector", - createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { + createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -321,7 +322,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { }, { name: "pods with no labels can be targeted by v1beta1 PDBs with DoesNotExist selector", - createPDBFunc: func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { + createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error { pdb := &v1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -350,6 +351,9 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { s, pdbc, informers, clientSet, _, _ := setup(t) defer s.TearDownFn() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nsName := fmt.Sprintf("pdb-selectors-%d", i) createNs(ctx, t, nsName, clientSet) @@ -360,7 +364,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { // Create the PDB first and wait for it to settle. pdbName := "test-pdb" - if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil { + if err := tc.createPDBFunc(ctx, clientSet, pdbName, nsName, minAvailable); err != nil { t.Errorf("Error creating PodDisruptionBudget: %v", err) } waitPDBStable(ctx, t, clientSet, 0, nsName, pdbName) @@ -498,9 +502,15 @@ func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podN } func TestPatchCompatibility(t *testing.T) { - s, _, _, clientSet, _, _ := setup(t) + s, pdbc, _, clientSet, _, _ := setup(t) defer s.TearDownFn() + // Even though pdbc isn't used in this test, its creation is already + // spawning some goroutines. So we need to run it to ensure they won't leak. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + pdbc.Run(ctx) + testcases := []struct { name string version string @@ -634,5 +644,4 @@ func TestPatchCompatibility(t *testing.T) { } }) } - } From d9d46d53263d74eccfee23ebcb35d3d492170382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 30 May 2022 20:42:00 +0200 Subject: [PATCH 2/6] Clean shutdown of certificates integration tests --- pkg/controller/certificates/certificate_controller.go | 7 ------- test/integration/certificates/duration_test.go | 6 +++--- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index 7bb8b2fdb1a..64798f2ae1c 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -31,10 +31,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" certificatesinformers "k8s.io/client-go/informers/certificates/v1" clientset "k8s.io/client-go/kubernetes" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" certificateslisters "k8s.io/client-go/listers/certificates/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" @@ -60,11 +58,6 @@ func NewCertificateController( csrInformer certificatesinformers.CertificateSigningRequestInformer, handler func(context.Context, *certificates.CertificateSigningRequest) error, ) *CertificateController { - // Send events to the apiserver - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - cc := &CertificateController{ name: name, kubeClient: kubeClient, diff --git a/test/integration/certificates/duration_test.go b/test/integration/certificates/duration_test.go index 12543e0dedf..e7511a02de6 100644 --- a/test/integration/certificates/duration_test.go +++ b/test/integration/certificates/duration_test.go @@ -51,12 +51,12 @@ import ( func TestCSRDuration(t *testing.T) { t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - s := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) t.Cleanup(s.TearDownFn) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + // assert that the metrics we collect during the test run match expectations // we have 7 valid test cases below that request a duration of which 6 should have their duration honored wantMetricStrings := []string{ From e5ec28ff86b4e22b6959dfeace620c4a7f963cb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 2 Jun 2022 18:14:06 +0200 Subject: [PATCH 3/6] Clean shutdown of network integration tests --- test/integration/network/services_test.go | 28 ++++++++++------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/test/integration/network/services_test.go b/test/integration/network/services_test.go index 2d2b3480931..19e3c9a30e2 100644 --- a/test/integration/network/services_test.go +++ b/test/integration/network/services_test.go @@ -26,9 +26,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - netutils "k8s.io/utils/net" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/test/integration/framework" ) @@ -38,26 +37,23 @@ import ( // mistakenly, repair the ClusterIP assigned to the Service that is being deleted. // https://issues.k8s.io/87603 func TestServicesFinalizersRepairLoop(t *testing.T) { - serviceCIDR := "10.0.0.0/16" clusterIP := "10.0.0.20" interval := 5 * time.Second - cfg := framework.NewIntegrationTestControlPlaneConfig() - _, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR) - if err != nil { - t.Fatalf("bad cidr: %v", err) - } - cfg.ExtraConfig.ServiceIPRange = *cidr - cfg.ExtraConfig.RepairServicesInterval = interval - _, s, closeFn := framework.RunAnAPIServer(cfg) - defer closeFn() - - client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL}) + client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.ServiceClusterIPRanges = serviceCIDR + }, + ModifyServerConfig: func(cfg *controlplane.Config) { + cfg.ExtraConfig.RepairServicesInterval = interval + }, + }) + defer tearDownFn() // verify client is working if err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) { - _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) if err != nil { t.Logf("error fetching endpoints: %v", err) return false, nil From b54363aceb03f33ac1d2b562e07c0ed714776c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 2 Jun 2022 20:21:50 +0200 Subject: [PATCH 4/6] Clean shutdown of storageversion integration tests --- pkg/controlplane/instance.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 20a16c9a53e..f3935b63e53 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -58,7 +58,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery" apiserverfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/generic" @@ -477,7 +476,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second, metav1.NamespaceSystem, labelAPIServerHeartbeat) - go controller.Run(wait.NeverStop) + go controller.Run(hookContext.StopCh) return nil }) m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error { @@ -490,7 +489,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) time.Duration(c.ExtraConfig.IdentityLeaseDurationSeconds)*time.Second, metav1.NamespaceSystem, KubeAPIServerIdentityLeaseLabelSelector, - ).Run(wait.NeverStop) + ).Run(hookContext.StopCh) return nil }) } From 2af8d0bbd77d4dc1564608fd14df2eb122ab600c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 2 Jun 2022 20:42:11 +0200 Subject: [PATCH 5/6] Clean shutdown of events integration tests --- test/integration/events/events_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/integration/events/events_test.go b/test/integration/events/events_test.go index bce66117840..9a6f3c18f11 100644 --- a/test/integration/events/events_test.go +++ b/test/integration/events/events_test.go @@ -58,13 +58,17 @@ func TestEventCompatibility(t *testing.T) { if err != nil { t.Fatal(err) } + stopCh := make(chan struct{}) + defer close(stopCh) oldBroadcaster := record.NewBroadcaster() + defer oldBroadcaster.Shutdown() oldRecorder := oldBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "integration"}) oldBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{Interface: client.CoreV1().Events("")}) oldRecorder.Eventf(regarding, v1.EventTypeNormal, "started", "note") newBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + defer newBroadcaster.Shutdown() newRecorder := newBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-scheduler") newBroadcaster.StartRecordingToSink(stopCh) newRecorder.Eventf(regarding, related, v1.EventTypeNormal, "memoryPressure", "killed", "memory pressure") From 006ff4510bfdc1abd89a33c3f99e6084d27a8f5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 2 Jun 2022 21:00:48 +0200 Subject: [PATCH 6/6] Clean shutdown of nodecontroller integration tests --- .../node_lifecycle_controller.go | 28 +++++++++++-------- .../nodelifecycle/scheduler/taint_manager.go | 24 +++++++++++----- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index cfb07d1872d..4394f45ff80 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -300,7 +300,8 @@ type Controller struct { getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error) - recorder record.EventRecorder + broadcaster record.EventBroadcaster + recorder record.EventRecorder // Value controlling Controller monitoring period, i.e. how often does Controller // check node health signal posted from kubelet. This value should be lower than @@ -372,13 +373,6 @@ func NewNodeLifecycleController( eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"}) - eventBroadcaster.StartStructuredLogging(0) - - klog.Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""), - }) if kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) @@ -390,6 +384,7 @@ func NewNodeLifecycleController( knownNodeSet: make(map[string]*v1.Node), nodeHealthMap: newNodeHealthMap(), nodeEvictionMap: newNodeEvictionMap(), + broadcaster: eventBroadcaster, recorder: recorder, nodeMonitorPeriod: nodeMonitorPeriod, nodeStartupGracePeriod: nodeStartupGracePeriod, @@ -536,6 +531,19 @@ func NewNodeLifecycleController( func (nc *Controller) Run(ctx context.Context) { defer utilruntime.HandleCrash() + // Start events processing pipeline. + nc.broadcaster.StartStructuredLogging(0) + klog.Infof("Sending events to api server.") + nc.broadcaster.StartRecordingToSink( + &v1core.EventSinkImpl{ + Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""), + }) + defer nc.broadcaster.Shutdown() + + // Close node update queue to cleanup go routine. + defer nc.nodeUpdateQueue.ShutDown() + defer nc.podUpdateQueue.ShutDown() + klog.Infof("Starting node controller") defer klog.Infof("Shutting down node controller") @@ -547,10 +555,6 @@ func (nc *Controller) Run(ctx context.Context) { go nc.taintManager.Run(ctx) } - // Close node update queue to cleanup go routine. - defer nc.nodeUpdateQueue.ShutDown() - defer nc.podUpdateQueue.ShutDown() - // Start workers to reconcile labels and/or update NoSchedule taint for nodes. for i := 0; i < scheduler.UpdateWorkerSize; i++ { // Thanks to "workqueue", each worker just need to get item from queue, because diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index 129a61c7df0..750a9390819 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -82,6 +82,7 @@ type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error) // from Nodes tainted with NoExecute Taints. type NoExecuteTaintManager struct { client clientset.Interface + broadcaster record.EventBroadcaster recorder record.EventRecorder getPod GetPodFunc getNode GetNodeFunc @@ -158,16 +159,10 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"}) - eventBroadcaster.StartStructuredLogging(0) - if c != nil { - klog.InfoS("Sending events to api server") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")}) - } else { - klog.Fatalf("kubeClient is nil when starting NodeController") - } tm := &NoExecuteTaintManager{ client: c, + broadcaster: eventBroadcaster, recorder: recorder, getPod: getPod, getNode: getNode, @@ -184,8 +179,23 @@ func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed. func (tc *NoExecuteTaintManager) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + klog.InfoS("Starting NoExecuteTaintManager") + // Start events processing pipeline. + tc.broadcaster.StartStructuredLogging(0) + if tc.client != nil { + klog.InfoS("Sending events to api server") + tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")}) + } else { + klog.Fatalf("kubeClient is nil when starting NodeController") + } + defer tc.broadcaster.Shutdown() + + defer tc.nodeUpdateQueue.ShutDown() + defer tc.podUpdateQueue.ShutDown() + for i := 0; i < UpdateWorkerSize; i++ { tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))