Fix event broadcaster shutdown in multiple controllers

This commit is contained in:
Wojciech Tyczyński 2022-05-17 14:48:01 +02:00
parent 81261d4693
commit 11b679c66a
7 changed files with 61 additions and 40 deletions

View File

@ -84,10 +84,13 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
// DaemonSetsController is responsible for synchronizing DaemonSet objects stored
// in the system with actual running pods.
type DaemonSetsController struct {
kubeClient clientset.Interface
eventRecorder record.EventRecorder
podControl controller.PodControlInterface
crControl controller.ControllerRevisionControlInterface
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
podControl controller.PodControlInterface
crControl controller.ControllerRevisionControlInterface
// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
@ -138,8 +141,6 @@ func NewDaemonSetsController(
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
@ -147,8 +148,9 @@ func NewDaemonSetsController(
}
}
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
kubeClient: kubeClient,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
@ -279,6 +281,11 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
// Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
dsc.eventBroadcaster.StartStructuredLogging(0)
dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")})
defer dsc.eventBroadcaster.Shutdown()
defer dsc.queue.ShutDown()
klog.Infof("Starting daemon sets controller")

View File

@ -67,9 +67,11 @@ var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
// in the system with actual running replica sets and pods.
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder
rsControl controller.RSControlInterface
client clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
// To allow injection of syncDeployment for testing.
syncHandler func(ctx context.Context, dKey string) error
@ -100,8 +102,6 @@ type DeploymentController struct {
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
@ -109,9 +109,10 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor
}
}
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
client: client,
eventBroadcaster: eventBroadcaster,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
@ -148,6 +149,12 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
dc.eventBroadcaster.StartStructuredLogging(0)
dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
defer dc.eventBroadcaster.Shutdown()
defer dc.queue.ShutDown()
klog.InfoS("Starting controller", "controller", "deployment")

View File

@ -114,7 +114,8 @@ type Controller struct {
// Orphan deleted pods that still have a Job tracking finalizer to be removed
orphanQueue workqueue.RateLimitingInterface
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
podUpdateBatchPeriod time.Duration
}
@ -123,8 +124,6 @@ type Controller struct {
// in sync with their corresponding Job objects.
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
@ -140,6 +139,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
finalizerExpectations: newUIDTrackingExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
}
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
@ -178,6 +178,12 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
jm.broadcaster.StartStructuredLogging(0)
jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
defer jm.broadcaster.Shutdown()
defer jm.queue.ShutDown()
defer jm.orphanQueue.ShutDown()

View File

@ -88,6 +88,8 @@ type ReplicaSetController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
eventBroadcaster record.EventBroadcaster
// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@ -117,8 +119,6 @@ type ReplicaSetController struct {
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if err := metrics.Register(legacyregistry.Register); err != nil {
klog.ErrorS(err, "unable to register metrics")
}
@ -130,13 +130,14 @@ func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInf
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
eventBroadcaster,
)
}
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
@ -145,6 +146,7 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
eventBroadcaster: eventBroadcaster,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
@ -188,17 +190,15 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
return rsc
}
// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
}
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
rsc.eventBroadcaster.StartStructuredLogging(0)
rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
defer rsc.eventBroadcaster.Shutdown()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)

View File

@ -30,7 +30,6 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/replicaset"
@ -50,8 +49,6 @@ type ReplicationManager struct {
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return &ReplicationManager{
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
v1.SchemeGroupVersion.WithKind("ReplicationController"),
@ -61,6 +58,7 @@ func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer cor
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
}},
eventBroadcaster,
),
}
}

View File

@ -74,6 +74,8 @@ type StatefulSetController struct {
revListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
// eventBroadcaster is the core of event processing pipeline.
eventBroadcaster record.EventBroadcaster
}
// NewStatefulSetController creates a new statefulset controller.
@ -85,8 +87,6 @@ func NewStatefulSetController(
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
kubeClient: kubeClient,
@ -101,10 +101,11 @@ func NewStatefulSetController(
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
revListerSynced: revInformer.Informer().HasSynced,
eventBroadcaster: eventBroadcaster,
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -142,6 +143,12 @@ func NewStatefulSetController(
// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
ssc.eventBroadcaster.StartStructuredLogging(0)
ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")})
defer ssc.eventBroadcaster.Shutdown()
defer ssc.queue.ShutDown()
klog.Infof("Starting stateful set controller")

View File

@ -41,7 +41,6 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/controller"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
@ -102,7 +101,6 @@ func TestQuota(t *testing.T) {
clientset,
replicationcontroller.BurstReplicas,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(ctx, 3)
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
@ -333,7 +331,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) {
clientset,
replicationcontroller.BurstReplicas,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(ctx, 3)
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
@ -462,7 +459,6 @@ func TestQuotaLimitService(t *testing.T) {
clientset,
replicationcontroller.BurstReplicas,
)
rm.SetEventRecorder(&record.FakeRecorder{})
go rm.Run(ctx, 3)
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources