Clean shutdown of cloud controllers in integration tests

This commit is contained in:
Wojciech Tyczyński 2022-07-21 13:47:01 +02:00
parent 28fc2991a5
commit f3996039ef
4 changed files with 40 additions and 21 deletions

View File

@ -93,7 +93,9 @@ var UpdateNodeSpecBackoff = wait.Backoff{
type CloudNodeController struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
cloud cloudprovider.Interface
@ -113,10 +115,6 @@ func NewCloudNodeController(
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})
eventBroadcaster.StartStructuredLogging(0)
klog.Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
_, instancesSupported := cloud.Instances()
_, instancesV2Supported := cloud.InstancesV2()
@ -127,6 +125,7 @@ func NewCloudNodeController(
cnc := &CloudNodeController{
nodeInformer: nodeInformer,
kubeClient: kubeClient,
broadcaster: eventBroadcaster,
recorder: recorder,
cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
@ -153,6 +152,12 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cnc.workqueue.ShutDown()
// Start event processing pipeline.
klog.Infof("Sending events to api server.")
cnc.broadcaster.StartStructuredLogging(0)
cnc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cnc.kubeClient.CoreV1().Events("")})
defer cnc.broadcaster.Shutdown()
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, cnc.nodesSynced); !ok {

View File

@ -55,7 +55,9 @@ var ShutdownTaint = &v1.Taint{
type CloudNodeLifecycleController struct {
kubeClient clientset.Interface
nodeLister v1lister.NodeLister
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
cloud cloudprovider.Interface
@ -73,10 +75,6 @@ func NewCloudNodeLifecycleController(
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
eventBroadcaster.StartStructuredLogging(0)
klog.Info("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient == nil {
return nil, errors.New("kubernetes client is nil")
@ -95,6 +93,7 @@ func NewCloudNodeLifecycleController(
c := &CloudNodeLifecycleController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
broadcaster: eventBroadcaster,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
@ -110,6 +109,12 @@ func (c *CloudNodeLifecycleController) Run(ctx context.Context, controllerManage
controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle")
defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle")
// Start event processing pipeline.
klog.Info("Sending events to api server")
c.broadcaster.StartStructuredLogging(0)
c.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
defer c.broadcaster.Shutdown()
// The following loops run communicate with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.

View File

@ -78,7 +78,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
rc := &RouteController{
@ -98,6 +97,13 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
if rc.broadcaster != nil {
rc.broadcaster.StartStructuredLogging(0)
rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")})
defer rc.broadcaster.Shutdown()
}
klog.Info("Starting route controller")
defer klog.Info("Shutting down route controller")
@ -105,10 +111,6 @@ func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration) {
return
}
if rc.broadcaster != nil {
rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")})
}
// TODO: If we do just the full Resync every 5 minutes (default value)
// that means that we may wait up to 5 minutes before even starting
// creating a route for it. This is bad.

View File

@ -110,8 +110,6 @@ func New(
featureGate featuregate.FeatureGate,
) (*Controller, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -230,6 +228,11 @@ func (s *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer s.queue.ShutDown()
// Start event processing pipeline.
s.eventBroadcaster.StartStructuredLogging(0)
s.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.kubeClient.CoreV1().Events("")})
defer s.eventBroadcaster.Shutdown()
klog.Info("Starting service controller")
defer klog.Info("Shutting down service controller")
@ -287,11 +290,15 @@ func (s *Controller) worker(ctx context.Context) {
// nodeSyncLoop takes nodeSync signal and triggers nodeSync
func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) {
klog.V(4).Info("nodeSyncLoop Started")
for range s.nodeSyncCh {
klog.V(4).Info("nodeSync has been triggered")
s.nodeSyncInternal(ctx, workers)
for {
select {
case <-s.nodeSyncCh:
klog.V(4).Info("nodeSync has been triggered")
s.nodeSyncInternal(ctx, workers)
case <-ctx.Done():
return
}
}
klog.V(2).Info("s.nodeSyncCh is closed. Exiting nodeSyncLoop")
}
func (s *Controller) processNextWorkItem(ctx context.Context) bool {