From 11b679c66ad0bf80997ec0757eb3bf987ef810e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 17 May 2022 14:48:01 +0200 Subject: [PATCH] Fix event broadcaster shutdown in multiple controllers --- pkg/controller/daemon/daemon_controller.go | 23 ++++++++++++------- .../deployment/deployment_controller.go | 23 ++++++++++++------- pkg/controller/job/job_controller.go | 12 +++++++--- pkg/controller/replicaset/replica_set.go | 22 +++++++++--------- .../replication/replication_controller.go | 4 +--- pkg/controller/statefulset/stateful_set.go | 13 ++++++++--- test/integration/quota/quota_test.go | 4 ---- 7 files changed, 61 insertions(+), 40 deletions(-) diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index e31dde1eacc..864089fefac 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -84,10 +84,13 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet") // DaemonSetsController is responsible for synchronizing DaemonSet objects stored // in the system with actual running pods. type DaemonSetsController struct { - kubeClient clientset.Interface - eventRecorder record.EventRecorder - podControl controller.PodControlInterface - crControl controller.ControllerRevisionControlInterface + kubeClient clientset.Interface + + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + + podControl controller.PodControlInterface + crControl controller.ControllerRevisionControlInterface // An dsc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. @@ -138,8 +141,6 @@ func NewDaemonSetsController( failedPodsBackoff *flowcontrol.Backoff, ) (*DaemonSetsController, error) { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { @@ -147,8 +148,9 @@ func NewDaemonSetsController( } } dsc := &DaemonSetsController{ - kubeClient: kubeClient, - eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}), + kubeClient: kubeClient, + eventBroadcaster: eventBroadcaster, + eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}), podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}), @@ -279,6 +281,11 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) { // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + dsc.eventBroadcaster.StartStructuredLogging(0) + dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")}) + defer dsc.eventBroadcaster.Shutdown() + defer dsc.queue.ShutDown() klog.Infof("Starting daemon sets controller") diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index ee4e3bdc7da..97461dd0255 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -67,9 +67,11 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment") // in the system with actual running replica sets and pods. type DeploymentController struct { // rsControl is used for adopting/releasing replica sets. - rsControl controller.RSControlInterface - client clientset.Interface - eventRecorder record.EventRecorder + rsControl controller.RSControlInterface + client clientset.Interface + + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder // To allow injection of syncDeployment for testing. syncHandler func(ctx context.Context, dKey string) error @@ -100,8 +102,6 @@ type DeploymentController struct { // NewDeploymentController creates a new DeploymentController. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil { @@ -109,9 +109,10 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor } } dc := &DeploymentController{ - client: client, - eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), + client: client, + eventBroadcaster: eventBroadcaster, + eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } dc.rsControl = controller.RealRSControl{ KubeClient: client, @@ -148,6 +149,12 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor // Run begins watching and syncing. func (dc *DeploymentController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + dc.eventBroadcaster.StartStructuredLogging(0) + dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")}) + defer dc.eventBroadcaster.Shutdown() + defer dc.queue.ShutDown() klog.InfoS("Starting controller", "controller", "deployment") diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 158c5022cc5..8941f136c22 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -114,7 +114,8 @@ type Controller struct { // Orphan deleted pods that still have a Job tracking finalizer to be removed orphanQueue workqueue.RateLimitingInterface - recorder record.EventRecorder + broadcaster record.EventBroadcaster + recorder record.EventRecorder podUpdateBatchPeriod time.Duration } @@ -123,8 +124,6 @@ type Controller struct { // in sync with their corresponding Job objects. func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) @@ -140,6 +139,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor finalizerExpectations: newUIDTrackingExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), + broadcaster: eventBroadcaster, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { @@ -178,6 +178,12 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor // Run the main goroutine responsible for watching and syncing jobs. func (jm *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + jm.broadcaster.StartStructuredLogging(0) + jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")}) + defer jm.broadcaster.Shutdown() + defer jm.queue.ShutDown() defer jm.orphanQueue.ShutDown() diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index fcd5a072c21..a8518c4e15b 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -88,6 +88,8 @@ type ReplicaSetController struct { kubeClient clientset.Interface podControl controller.PodControlInterface + eventBroadcaster record.EventBroadcaster + // A ReplicaSet is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -117,8 +119,6 @@ type ReplicaSetController struct { // NewReplicaSetController configures a replica set controller with the specified event recorder func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) if err := metrics.Register(legacyregistry.Register); err != nil { klog.ErrorS(err, "unable to register metrics") } @@ -130,13 +130,14 @@ func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInf KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}), }, + eventBroadcaster, ) } // NewBaseController is the implementation of NewReplicaSetController with additional injected // parameters so that it can also serve as the implementation of NewReplicationController. func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, - gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { + gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController { if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter()) } @@ -145,6 +146,7 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer GroupVersionKind: gvk, kubeClient: kubeClient, podControl: podControl, + eventBroadcaster: eventBroadcaster, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName), @@ -188,17 +190,15 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer return rsc } -// SetEventRecorder replaces the event recorder used by the ReplicaSetController -// with the given recorder. Only used for testing. -func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) { - // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks - // need to pass in a fake. - rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder} -} - // Run begins watching and syncing. func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + rsc.eventBroadcaster.StartStructuredLogging(0) + rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")}) + defer rsc.eventBroadcaster.Shutdown() + defer rsc.queue.ShutDown() controllerName := strings.ToLower(rsc.Kind) diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 187367e7118..013e77d1c5f 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -30,7 +30,6 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replicaset" @@ -50,8 +49,6 @@ type ReplicationManager struct { // NewReplicationManager configures a replication manager with the specified event recorder func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) return &ReplicationManager{ *replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas, v1.SchemeGroupVersion.WithKind("ReplicationController"), @@ -61,6 +58,7 @@ func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer cor KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}), }}, + eventBroadcaster, ), } } diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 57da437f897..7ae634f29fa 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -74,6 +74,8 @@ type StatefulSetController struct { revListerSynced cache.InformerSynced // StatefulSets that need to be synced. queue workqueue.RateLimitingInterface + // eventBroadcaster is the core of event processing pipeline. + eventBroadcaster record.EventBroadcaster } // NewStatefulSetController creates a new statefulset controller. @@ -85,8 +87,6 @@ func NewStatefulSetController( kubeClient clientset.Interface, ) *StatefulSetController { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"}) ssc := &StatefulSetController{ kubeClient: kubeClient, @@ -101,10 +101,11 @@ func NewStatefulSetController( recorder, ), pvcListerSynced: pvcInformer.Informer().HasSynced, + revListerSynced: revInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, - revListerSynced: revInformer.Informer().HasSynced, + eventBroadcaster: eventBroadcaster, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -142,6 +143,12 @@ func NewStatefulSetController( // Run runs the statefulset controller. func (ssc *StatefulSetController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + ssc.eventBroadcaster.StartStructuredLogging(0) + ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")}) + defer ssc.eventBroadcaster.Shutdown() + defer ssc.queue.ShutDown() klog.Infof("Starting stateful set controller") diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index a173ab2a98d..e8bb3dad27e 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -41,7 +41,6 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/controller" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" @@ -102,7 +101,6 @@ func TestQuota(t *testing.T) { clientset, replicationcontroller.BurstReplicas, ) - rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(ctx, 3) discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources @@ -333,7 +331,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { clientset, replicationcontroller.BurstReplicas, ) - rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(ctx, 3) discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources @@ -462,7 +459,6 @@ func TestQuotaLimitService(t *testing.T) { clientset, replicationcontroller.BurstReplicas, ) - rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(ctx, 3) discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources