Remove Stop from ControllerContext and pass ctx.Done

This commit is contained in:
Mike Dame 2021-09-22 16:11:00 -04:00
parent 80dcf7df1b
commit bfd7f72e9b
11 changed files with 90 additions and 94 deletions

View File

@ -45,7 +45,7 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err) return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
} }
go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), controllerContext.Stop) go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -56,7 +56,7 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"), controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -66,7 +66,7 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller
controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"), controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas, replicaset.BurstReplicas,
).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -80,6 +80,6 @@ func startDeploymentController(ctx context.Context, controllerContext Controller
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
} }
go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), controllerContext.Stop) go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -40,37 +40,37 @@ func startHPAController(ctx context.Context, controllerContext ControllerContext
return nil, false, nil return nil, false, nil
} }
return startHPAControllerWithRESTClient(controllerContext) return startHPAControllerWithRESTClient(ctx, controllerContext)
} }
func startHPAControllerWithRESTClient(ctx ControllerContext) (controller.Interface, bool, error) { func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery()) apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// invalidate the discovery information roughly once per resync interval our API // invalidate the discovery information roughly once per resync interval our API
// information is *at most* two resync intervals old. // information is *at most* two resync intervals old.
go custom_metrics.PeriodicallyInvalidate( go custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter, apiVersionsGetter,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Stop) ctx.Done())
metricsClient := metrics.NewRESTMetricsClient( metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig), resourceclient.NewForConfigOrDie(clientConfig),
custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter), custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
external_metrics.NewForConfigOrDie(clientConfig), external_metrics.NewForConfigOrDie(clientConfig),
) )
return startHPAControllerWithMetricsClient(ctx, metricsClient) return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient)
} }
func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) { func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching, // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
// so we want to re-fetch every time when we actually ask for it // so we want to re-fetch every time when we actually ask for it
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -79,15 +79,15 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
hpaClient.CoreV1(), hpaClient.CoreV1(),
scaleClient, scaleClient,
hpaClient.AutoscalingV1(), hpaClient.AutoscalingV1(),
ctx.RESTMapper, controllerContext.RESTMapper,
metricsClient, metricsClient,
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), controllerContext.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx.Stop) ).Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -36,7 +36,7 @@ func startJobController(ctx context.Context, controllerContext ControllerContext
controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("job-controller"), controllerContext.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -49,7 +49,7 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err)
} }
go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), controllerContext.Stop) go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
cjc, err := cronjob.NewController( cjc, err := cronjob.NewController(
@ -58,6 +58,6 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller: %v", err) return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
} }
go cjc.Run(controllerContext.Stop) go cjc.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -34,7 +34,7 @@ func startBootstrapSignerController(ctx context.Context, controllerContext Contr
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err) return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
} }
go bsc.Run(controllerContext.Stop) go bsc.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -47,6 +47,6 @@ func startTokenCleanerController(ctx context.Context, controllerContext Controll
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err) return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
} }
go tcc.Run(controllerContext.Stop) go tcc.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err)
} }
go kubeletServingSigner.Run(5, controllerContext.Stop) go kubeletServingSigner.Run(5, ctx.Done())
} else { } else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving") klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving")
} }
@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err)
} }
go kubeletClientSigner.Run(5, controllerContext.Stop) go kubeletClientSigner.Run(5, ctx.Done())
} else { } else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet") klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet")
} }
@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err)
} }
go kubeAPIServerClientSigner.Run(5, controllerContext.Stop) go kubeAPIServerClientSigner.Run(5, ctx.Done())
} else { } else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client") klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client")
} }
@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err)
} }
go legacyUnknownSigner.Run(5, controllerContext.Stop) go legacyUnknownSigner.Run(5, ctx.Done())
} else { } else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown") klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown")
} }
@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll
controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), controllerContext.ClientBuilder.ClientOrDie("certificate-controller"),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
) )
go approver.Run(5, controllerContext.Stop) go approver.Run(5, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller
controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
) )
go cleaner.Run(1, controllerContext.Stop) go cleaner.Run(1, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err)
} }
go sac.Run(1, controllerContext.Stop) go sac.Run(1, ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -229,8 +229,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
klog.Fatalf("error starting controllers: %v", err) klog.Fatalf("error starting controllers: %v", err)
} }
controllerContext.InformerFactory.Start(controllerContext.Stop) controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop) controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted) close(controllerContext.InformersStarted)
select {} select {}
@ -352,9 +352,6 @@ type ControllerContext struct {
// ExternalLoops is for a kube-controller-manager running with a cloud-controller-manager // ExternalLoops is for a kube-controller-manager running with a cloud-controller-manager
LoopMode ControllerLoopMode LoopMode ControllerLoopMode
// Stop is the stop channel
Stop <-chan struct{}
// InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe, // InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe,
// for an individual controller to start the shared informers. Before it is closed, they should not. // for an individual controller to start the shared informers. Before it is closed, they should not.
InformersStarted chan struct{} InformersStarted chan struct{}
@ -535,7 +532,6 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
AvailableResources: availableResources, AvailableResources: availableResources,
Cloud: cloud, Cloud: cloud,
LoopMode: loopMode, LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}), InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s), ResyncPeriod: ResyncPeriod(s),
} }
@ -556,7 +552,7 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star
// Initialize the cloud provider with a reference to the clientBuilder only after token controller // Initialize the cloud provider with a reference to the clientBuilder only after token controller
// has started in case the cloud provider uses the client builder. // has started in case the cloud provider uses the client builder.
if controllerCtx.Cloud != nil { if controllerCtx.Cloud != nil {
controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, controllerCtx.Stop) controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
} }
var controllerChecks []healthz.HealthChecker var controllerChecks []healthz.HealthChecker
@ -653,10 +649,10 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating Tokens controller: %v", err) return nil, true, fmt.Errorf("error creating Tokens controller: %v", err)
} }
go controller.Run(int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs), controllerContext.Stop) go controller.Run(int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Done())
// start the first set of informers now so that other controllers can start // start the first set of informers now so that other controllers can start
controllerContext.InformerFactory.Start(controllerContext.Stop) controllerContext.InformerFactory.Start(ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -92,7 +92,7 @@ func startServiceController(ctx context.Context, controllerContext ControllerCon
klog.Errorf("Failed to start service controller: %v", err) klog.Errorf("Failed to start service controller: %v", err)
return nil, false, nil return nil, false, nil
} }
go serviceController.Run(controllerContext.Stop, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) go serviceController.Run(ctx.Done(), int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
return nil, true, nil return nil, true, nil
} }
@ -168,7 +168,7 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo
if err != nil { if err != nil {
return nil, true, err return nil, true, err
} }
go nodeIpamController.Run(controllerContext.Stop) go nodeIpamController.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -193,7 +193,7 @@ func startNodeLifecycleController(ctx context.Context, controllerContext Control
if err != nil { if err != nil {
return nil, true, err return nil, true, err
} }
go lifecycleController.Run(controllerContext.Stop) go lifecycleController.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -212,7 +212,7 @@ func startCloudNodeLifecycleController(ctx context.Context, controllerContext Co
return nil, false, nil return nil, false, nil
} }
go cloudNodeLifecycleController.Run(controllerContext.Stop) go cloudNodeLifecycleController.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -252,7 +252,7 @@ func startRouteController(ctx context.Context, controllerContext ControllerConte
controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ComponentConfig.KubeCloudShared.ClusterName, controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
clusterCIDRs) clusterCIDRs)
go routeController.Run(controllerContext.Stop, controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) go routeController.Run(ctx.Done(), controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
return nil, true, nil return nil, true, nil
} }
@ -285,17 +285,17 @@ func startPersistentVolumeBinderController(ctx context.Context, controllerContex
if volumeControllerErr != nil { if volumeControllerErr != nil {
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr) return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
} }
go volumeController.Run(controllerContext.Stop) go volumeController.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
func startAttachDetachController(_ context.Context, ctx ControllerContext) (controller.Interface, bool, error) { func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { if controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period") return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
} }
csiNodeInformer := ctx.InformerFactory.Storage().V1().CSINodes() csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes()
csiDriverInformer := ctx.InformerFactory.Storage().V1().CSIDrivers() csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers()
plugins, err := ProbeAttachableVolumePlugins() plugins, err := ProbeAttachableVolumePlugins()
if err != nil { if err != nil {
@ -303,34 +303,34 @@ func startAttachDetachController(_ context.Context, ctx ControllerContext) (cont
} }
filteredDialOptions, err := options.ParseVolumeHostFilters( filteredDialOptions, err := options.ParseVolumeHostFilters(
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
if err != nil { if err != nil {
return nil, true, err return nil, true, err
} }
attachDetachController, attachDetachControllerErr := attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController( attachdetach.NewAttachDetachController(
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"), controllerContext.ClientBuilder.ClientOrDie("attachdetach-controller"),
ctx.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(), controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
csiNodeInformer, csiNodeInformer,
csiDriverInformer, csiDriverInformer,
ctx.InformerFactory.Storage().V1().VolumeAttachments(), controllerContext.InformerFactory.Storage().V1().VolumeAttachments(),
ctx.Cloud, controllerContext.Cloud,
plugins, plugins,
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
ctx.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync,
ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration,
attachdetach.DefaultTimerConfig, attachdetach.DefaultTimerConfig,
filteredDialOptions, filteredDialOptions,
) )
if attachDetachControllerErr != nil { if attachDetachControllerErr != nil {
return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
} }
go attachDetachController.Run(ctx.Stop) go attachDetachController.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -361,7 +361,7 @@ func startVolumeExpandController(ctx context.Context, controllerContext Controll
if expandControllerErr != nil { if expandControllerErr != nil {
return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr) return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
} }
go expandController.Run(controllerContext.Stop) go expandController.Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
return nil, false, nil return nil, false, nil
@ -376,7 +376,7 @@ func startEphemeralVolumeController(ctx context.Context, controllerContext Contr
if err != nil { if err != nil {
return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
} }
go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), controllerContext.Stop) go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
return nil, false, nil return nil, false, nil
@ -389,7 +389,7 @@ func startEndpointController(ctx context.Context, controllerCtx ControllerContex
controllerCtx.InformerFactory.Core().V1().Endpoints(), controllerCtx.InformerFactory.Core().V1().Endpoints(),
controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"), controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"),
controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), controllerCtx.Stop) ).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -399,7 +399,7 @@ func startReplicationController(ctx context.Context, controllerContext Controlle
controllerContext.InformerFactory.Core().V1().ReplicationControllers(), controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.ClientBuilder.ClientOrDie("replication-controller"), controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas, replicationcontroller.BurstReplicas,
).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -409,7 +409,7 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte
controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Core().V1().Nodes(),
int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold), int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(controllerContext.Stop) ).Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -441,10 +441,10 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), controllerContext.Stop) go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Done())
// Periodically the quota controller to detect new resource types // Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, controllerContext.Stop) go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -457,10 +457,10 @@ func startNamespaceController(ctx context.Context, controllerContext ControllerC
nsKubeconfig.QPS *= 20 nsKubeconfig.QPS *= 20
nsKubeconfig.Burst *= 100 nsKubeconfig.Burst *= 100
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
return startModifiedNamespaceController(controllerContext, namespaceKubeClient, nsKubeconfig) return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig)
} }
func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) { func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
metadataClient, err := metadata.NewForConfig(nsKubeconfig) metadataClient, err := metadata.NewForConfig(nsKubeconfig)
if err != nil { if err != nil {
@ -473,11 +473,11 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient
namespaceKubeClient, namespaceKubeClient,
metadataClient, metadataClient,
discoverResourcesFn, discoverResourcesFn,
ctx.InformerFactory.Core().V1().Namespaces(), controllerContext.InformerFactory.Core().V1().Namespaces(),
ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
v1.FinalizerKubernetes, v1.FinalizerKubernetes,
) )
go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop) go namespaceController.Run(int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -492,7 +492,7 @@ func startServiceAccountController(ctx context.Context, controllerContext Contro
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err) return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
} }
go sac.Run(1, controllerContext.Stop) go sac.Run(1, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -500,7 +500,7 @@ func startTTLController(ctx context.Context, controllerContext ControllerContext
go ttlcontroller.NewTTLController( go ttlcontroller.NewTTLController(
controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("ttl-controller"), controllerContext.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, controllerContext.Stop) ).Run(5, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -539,11 +539,11 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
// Start the garbage collector. // Start the garbage collector.
workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
go garbageCollector.Run(workers, controllerContext.Stop) go garbageCollector.Run(workers, ctx.Done())
// Periodically refresh the RESTMapper with new discovery information and sync // Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector. // the garbage collector.
go garbageCollector.Sync(discoveryClient, 30*time.Second, controllerContext.Stop) go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Done())
return garbageCollector, true, nil return garbageCollector, true, nil
} }
@ -559,7 +559,7 @@ func startPVCProtectionController(ctx context.Context, controllerContext Control
if err != nil { if err != nil {
return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err) return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)
} }
go pvcProtectionController.Run(1, controllerContext.Stop) go pvcProtectionController.Run(1, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -568,7 +568,7 @@ func startPVProtectionController(ctx context.Context, controllerContext Controll
controllerContext.InformerFactory.Core().V1().PersistentVolumes(), controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"), controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
).Run(1, controllerContext.Stop) ).Run(1, ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -579,7 +579,7 @@ func startTTLAfterFinishedController(ctx context.Context, controllerContext Cont
go ttlafterfinished.New( go ttlafterfinished.New(
controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -681,6 +681,6 @@ func startStorageVersionGCController(ctx context.Context, controllerContext Cont
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
controllerContext.InformerFactory.Coordination().V1().Leases(), controllerContext.InformerFactory.Coordination().V1().Leases(),
controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(), controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
).Run(controllerContext.Stop) ).Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -150,7 +150,7 @@ func TestController_DiscoveryError(t *testing.T) {
} }
} }
_, _, err := startModifiedNamespaceController( _, _, err := startModifiedNamespaceController(
ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) context.TODO(), ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller"))
if test.expectedErr != (err != nil) { if test.expectedErr != (err != nil) {
t.Errorf("Namespace Controller test failed for use case: %v", name) t.Errorf("Namespace Controller test failed for use case: %v", name)
} }

View File

@ -37,7 +37,7 @@ func startEndpointSliceController(ctx context.Context, controllerContext Control
controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice, controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"), controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"),
controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration, controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }
@ -49,6 +49,6 @@ func startEndpointSliceMirroringController(ctx context.Context, controllerContex
controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset, controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset,
controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"), controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"),
controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration, controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), controllerContext.Stop) ).Run(int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -58,6 +58,6 @@ func startDisruptionController(ctx context.Context, controllerContext Controller
controllerContext.RESTMapper, controllerContext.RESTMapper,
scaleClient, scaleClient,
client.Discovery(), client.Discovery(),
).Run(controllerContext.Stop) ).Run(ctx.Done())
return nil, true, nil return nil, true, nil
} }

View File

@ -31,6 +31,6 @@ func startClusterRoleAggregrationController(ctx context.Context, controllerConte
go clusterroleaggregation.NewClusterRoleAggregation( go clusterroleaggregation.NewClusterRoleAggregation(
controllerContext.InformerFactory.Rbac().V1().ClusterRoles(), controllerContext.InformerFactory.Rbac().V1().ClusterRoles(),
controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(5, controllerContext.Stop) ).Run(5, ctx.Done())
return nil, true, nil return nil, true, nil
} }