From c84c27b6ac797347cf03c6776c86327c33a36908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 14:51:46 +0200 Subject: [PATCH 1/2] Clean shutdown of event broadcaster in controllers --- .../cronjob/cronjob_controllerv2.go | 21 +++++++++++++------ .../nodeipam/node_ipam_controller.go | 16 +++++++------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index 3c0dda0bd67..551861a1825 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -62,8 +62,11 @@ var ( // ControllerV2 is a controller for CronJobs. // Refactored Cronjob controller that uses DelayingQueue and informers type ControllerV2 struct { - queue workqueue.RateLimitingInterface - recorder record.EventRecorder + queue workqueue.RateLimitingInterface + + kubeClient clientset.Interface + recorder record.EventRecorder + broadcaster record.EventBroadcaster jobControl jobControlInterface cronJobControl cjControlInterface @@ -81,12 +84,12 @@ type ControllerV2 struct { // NewControllerV2 creates and initializes a new Controller. func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) jm := &ControllerV2{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"), + kubeClient: kubeClient, + broadcaster: eventBroadcaster, + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}), jobControl: realJobControl{KubeClient: kubeClient}, cronJobControl: &realCJControl{KubeClient: kubeClient}, @@ -123,6 +126,12 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer // Run starts the main goroutine responsible for watching and syncing jobs. func (jm *ControllerV2) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start event processing pipeline. + jm.broadcaster.StartStructuredLogging(0) + jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")}) + defer jm.broadcaster.Shutdown() + defer jm.queue.ShutDown() klog.InfoS("Starting cronjob controller v2") diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go index b8dc32f33de..cb0853ffeab 100644 --- a/pkg/controller/nodeipam/node_ipam_controller.go +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -55,6 +55,7 @@ type Controller struct { serviceCIDR *net.IPNet secondaryServiceCIDR *net.IPNet kubeClient clientset.Interface + eventBroadcaster record.EventBroadcaster // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) @@ -84,15 +85,6 @@ func NewNodeIpamController( klog.Fatalf("kubeClient is nil when starting Controller") } - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - - klog.Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: kubeClient.CoreV1().Events(""), - }) - // Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation. if allocatorType != ipam.CloudAllocatorType { if len(clusterCIDRs) == 0 { @@ -110,6 +102,7 @@ func NewNodeIpamController( ic := &Controller{ cloud: cloud, kubeClient: kubeClient, + eventBroadcaster: record.NewBroadcaster(), lookupIP: net.LookupIP, clusterCIDRs: clusterCIDRs, serviceCIDR: serviceCIDR, @@ -146,6 +139,11 @@ func NewNodeIpamController( func (nc *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + // Start event processing pipeline. + nc.eventBroadcaster.StartStructuredLogging(0) + nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")}) + defer nc.eventBroadcaster.Shutdown() + klog.Infof("Starting ipam controller") defer klog.Infof("Shutting down ipam controller") From 3786cfdf855b46044a33de5622452825ad86816b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 16:15:22 +0200 Subject: [PATCH 2/2] Clean shutdown of serving integration test --- cmd/kube-scheduler/app/options/options.go | 22 +++++++++-------- .../k8s.io/cloud-provider/options/options.go | 24 +++++++++++-------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/cmd/kube-scheduler/app/options/options.go b/cmd/kube-scheduler/app/options/options.go index 6a5b2a1c131..dfc7ce86300 100644 --- a/cmd/kube-scheduler/app/options/options.go +++ b/cmd/kube-scheduler/app/options/options.go @@ -222,6 +222,15 @@ func (o *Options) ApplyTo(c *schedulerappconfig.Config) error { c.ComponentConfig = *cfg } + // Build kubeconfig first to so that if it fails, it doesn't cause leaking + // goroutines (started from initializing secure serving - which underneath + // creates a queue which in its constructor starts a goroutine). + kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master) + if err != nil { + return err + } + c.KubeConfig = kubeConfig + if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil { return err } @@ -271,14 +280,8 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) { return nil, err } - // Prepare kube config. - kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master) - if err != nil { - return nil, err - } - // Prepare kube clients. - client, eventClient, err := createClients(kubeConfig) + client, eventClient, err := createClients(c.KubeConfig) if err != nil { return nil, err } @@ -294,16 +297,15 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) { schedulerName = c.ComponentConfig.Profiles[0].SchedulerName } coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName) - leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, kubeConfig, coreRecorder) + leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, c.KubeConfig, coreRecorder) if err != nil { return nil, err } } c.Client = client - c.KubeConfig = kubeConfig c.InformerFactory = scheduler.NewInformerFactory(client, 0) - dynClient := dynamic.NewForConfigOrDie(kubeConfig) + dynClient := dynamic.NewForConfigOrDie(c.KubeConfig) c.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, corev1.NamespaceAll, nil) c.LeaderElection = leaderElectionConfig diff --git a/staging/src/k8s.io/cloud-provider/options/options.go b/staging/src/k8s.io/cloud-provider/options/options.go index 7c29fa39574..6dcfa233a28 100644 --- a/staging/src/k8s.io/cloud-provider/options/options.go +++ b/staging/src/k8s.io/cloud-provider/options/options.go @@ -138,6 +138,20 @@ func (o *CloudControllerManagerOptions) Flags(allControllers, disabledByDefaultC // ApplyTo fills up cloud controller manager config with options. func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent string) error { var err error + + // Build kubeconfig first to so that if it fails, it doesn't cause leaking + // goroutines (started from initializing secure serving - which underneath + // creates a queue which in its constructor starts a goroutine). + c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig) + if err != nil { + return err + } + c.Kubeconfig.DisableCompression = true + c.Kubeconfig.ContentConfig.AcceptContentTypes = o.Generic.ClientConnection.AcceptContentTypes + c.Kubeconfig.ContentConfig.ContentType = o.Generic.ClientConnection.ContentType + c.Kubeconfig.QPS = o.Generic.ClientConnection.QPS + c.Kubeconfig.Burst = int(o.Generic.ClientConnection.Burst) + if err = o.Generic.ApplyTo(&c.ComponentConfig.Generic); err != nil { return err } @@ -159,16 +173,6 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent stri } } - c.Kubeconfig, err = clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig) - if err != nil { - return err - } - c.Kubeconfig.DisableCompression = true - c.Kubeconfig.ContentConfig.AcceptContentTypes = o.Generic.ClientConnection.AcceptContentTypes - c.Kubeconfig.ContentConfig.ContentType = o.Generic.ClientConnection.ContentType - c.Kubeconfig.QPS = o.Generic.ClientConnection.QPS - c.Kubeconfig.Burst = int(o.Generic.ClientConnection.Burst) - c.Client, err = clientset.NewForConfig(restclient.AddUserAgent(c.Kubeconfig, userAgent)) if err != nil { return err