diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index ea2f9db0598..99114cc2431 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -45,7 +45,7 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC if err != nil { return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err) } - go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), controllerContext.Stop) + go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done()) return nil, true, nil } @@ -56,7 +56,7 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"), - ).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done()) return nil, true, nil } @@ -66,7 +66,7 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, - ).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done()) return nil, true, nil } @@ -80,6 +80,6 @@ func startDeploymentController(ctx context.Context, controllerContext Controller if err != nil { return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) } - go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), controllerContext.Stop) + go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index d61435634b6..53bca1109cf 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -40,37 +40,37 @@ func startHPAController(ctx context.Context, controllerContext ControllerContext return nil, false, nil } - return startHPAControllerWithRESTClient(controllerContext) + return startHPAControllerWithRESTClient(ctx, controllerContext) } -func startHPAControllerWithRESTClient(ctx ControllerContext) (controller.Interface, bool, error) { - clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") - hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") +func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") + hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery()) // invalidate the discovery information roughly once per resync interval our API // information is *at most* two resync intervals old. go custom_metrics.PeriodicallyInvalidate( apiVersionsGetter, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, - ctx.Stop) + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, + ctx.Done()) metricsClient := metrics.NewRESTMetricsClient( resourceclient.NewForConfigOrDie(clientConfig), - custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter), + custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter), external_metrics.NewForConfigOrDie(clientConfig), ) - return startHPAControllerWithMetricsClient(ctx, metricsClient) + return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient) } -func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) { - hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") - hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") +func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) { + hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") + hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching, // so we want to re-fetch every time when we actually ask for it scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) - scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return nil, false, err } @@ -79,15 +79,15 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me hpaClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), - ctx.RESTMapper, + controllerContext.RESTMapper, metricsClient, - ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, - ).Run(ctx.Stop) + controllerContext.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, + ).Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index 94f7114d230..fa363a4ce4e 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -36,7 +36,7 @@ func startJobController(ctx context.Context, controllerContext ControllerContext controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.ClientBuilder.ClientOrDie("job-controller"), - ).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done()) return nil, true, nil } @@ -49,7 +49,7 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) } - go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), controllerContext.Stop) + go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Done()) return nil, true, nil } cjc, err := cronjob.NewController( @@ -58,6 +58,6 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller: %v", err) } - go cjc.Run(controllerContext.Stop) + go cjc.Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/bootstrap.go b/cmd/kube-controller-manager/app/bootstrap.go index ca21fd904ee..2dbc0b160c8 100644 --- a/cmd/kube-controller-manager/app/bootstrap.go +++ b/cmd/kube-controller-manager/app/bootstrap.go @@ -34,7 +34,7 @@ func startBootstrapSignerController(ctx context.Context, controllerContext Contr if err != nil { return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err) } - go bsc.Run(controllerContext.Stop) + go bsc.Run(ctx.Done()) return nil, true, nil } @@ -47,6 +47,6 @@ func startTokenCleanerController(ctx context.Context, controllerContext Controll if err != nil { return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err) } - go tcc.Run(controllerContext.Stop) + go tcc.Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index 567e6101441..ca3e6c9f1a9 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) } - go kubeletServingSigner.Run(5, controllerContext.Stop) + go kubeletServingSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving") } @@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) } - go kubeletClientSigner.Run(5, controllerContext.Stop) + go kubeletClientSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet") } @@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) } - go kubeAPIServerClientSigner.Run(5, controllerContext.Stop) + go kubeAPIServerClientSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client") } @@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) } - go legacyUnknownSigner.Run(5, controllerContext.Stop) + go legacyUnknownSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown") } @@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go approver.Run(5, controllerContext.Stop) + go approver.Run(5, ctx.Done()) return nil, true, nil } @@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go cleaner.Run(1, controllerContext.Stop) + go cleaner.Run(1, ctx.Done()) return nil, true, nil } @@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC if err != nil { return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) } - go sac.Run(1, controllerContext.Stop) + go sac.Run(1, ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index b3739f23ee6..8ec7afe0758 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -229,8 +229,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { klog.Fatalf("error starting controllers: %v", err) } - controllerContext.InformerFactory.Start(controllerContext.Stop) - controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop) + controllerContext.InformerFactory.Start(stopCh) + controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh) close(controllerContext.InformersStarted) select {} @@ -352,9 +352,6 @@ type ControllerContext struct { // ExternalLoops is for a kube-controller-manager running with a cloud-controller-manager LoopMode ControllerLoopMode - // Stop is the stop channel - Stop <-chan struct{} - // InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe, // for an individual controller to start the shared informers. Before it is closed, they should not. InformersStarted chan struct{} @@ -535,7 +532,6 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien AvailableResources: availableResources, Cloud: cloud, LoopMode: loopMode, - Stop: stop, InformersStarted: make(chan struct{}), ResyncPeriod: ResyncPeriod(s), } @@ -556,7 +552,7 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star // Initialize the cloud provider with a reference to the clientBuilder only after token controller // has started in case the cloud provider uses the client builder. if controllerCtx.Cloud != nil { - controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, controllerCtx.Stop) + controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done()) } var controllerChecks []healthz.HealthChecker @@ -653,10 +649,10 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController if err != nil { return nil, true, fmt.Errorf("error creating Tokens controller: %v", err) } - go controller.Run(int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs), controllerContext.Stop) + go controller.Run(int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Done()) // start the first set of informers now so that other controllers can start - controllerContext.InformerFactory.Start(controllerContext.Stop) + controllerContext.InformerFactory.Start(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index f2b79b226f8..cd217818c4a 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -92,7 +92,7 @@ func startServiceController(ctx context.Context, controllerContext ControllerCon klog.Errorf("Failed to start service controller: %v", err) return nil, false, nil } - go serviceController.Run(controllerContext.Stop, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) + go serviceController.Run(ctx.Done(), int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) return nil, true, nil } @@ -168,7 +168,7 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo if err != nil { return nil, true, err } - go nodeIpamController.Run(controllerContext.Stop) + go nodeIpamController.Run(ctx.Done()) return nil, true, nil } @@ -193,7 +193,7 @@ func startNodeLifecycleController(ctx context.Context, controllerContext Control if err != nil { return nil, true, err } - go lifecycleController.Run(controllerContext.Stop) + go lifecycleController.Run(ctx.Done()) return nil, true, nil } @@ -212,7 +212,7 @@ func startCloudNodeLifecycleController(ctx context.Context, controllerContext Co return nil, false, nil } - go cloudNodeLifecycleController.Run(controllerContext.Stop) + go cloudNodeLifecycleController.Run(ctx.Done()) return nil, true, nil } @@ -252,7 +252,7 @@ func startRouteController(ctx context.Context, controllerContext ControllerConte controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDRs) - go routeController.Run(controllerContext.Stop, controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) + go routeController.Run(ctx.Done(), controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) return nil, true, nil } @@ -285,17 +285,17 @@ func startPersistentVolumeBinderController(ctx context.Context, controllerContex if volumeControllerErr != nil { return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr) } - go volumeController.Run(controllerContext.Stop) + go volumeController.Run(ctx.Done()) return nil, true, nil } -func startAttachDetachController(_ context.Context, ctx ControllerContext) (controller.Interface, bool, error) { - if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { +func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period") } - csiNodeInformer := ctx.InformerFactory.Storage().V1().CSINodes() - csiDriverInformer := ctx.InformerFactory.Storage().V1().CSIDrivers() + csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes() + csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers() plugins, err := ProbeAttachableVolumePlugins() if err != nil { @@ -303,34 +303,34 @@ func startAttachDetachController(_ context.Context, ctx ControllerContext) (cont } filteredDialOptions, err := options.ParseVolumeHostFilters( - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) if err != nil { return nil, true, err } attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( - ctx.ClientBuilder.ClientOrDie("attachdetach-controller"), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Core().V1().PersistentVolumes(), + controllerContext.ClientBuilder.ClientOrDie("attachdetach-controller"), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), csiNodeInformer, csiDriverInformer, - ctx.InformerFactory.Storage().V1().VolumeAttachments(), - ctx.Cloud, + controllerContext.InformerFactory.Storage().V1().VolumeAttachments(), + controllerContext.Cloud, plugins, - GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), - ctx.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, - ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, + GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), + controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, + controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, attachdetach.DefaultTimerConfig, filteredDialOptions, ) if attachDetachControllerErr != nil { return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) } - go attachDetachController.Run(ctx.Stop) + go attachDetachController.Run(ctx.Done()) return nil, true, nil } @@ -361,7 +361,7 @@ func startVolumeExpandController(ctx context.Context, controllerContext Controll if expandControllerErr != nil { return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr) } - go expandController.Run(controllerContext.Stop) + go expandController.Run(ctx.Done()) return nil, true, nil } return nil, false, nil @@ -376,7 +376,7 @@ func startEphemeralVolumeController(ctx context.Context, controllerContext Contr if err != nil { return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) } - go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), controllerContext.Stop) + go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Done()) return nil, true, nil } return nil, false, nil @@ -389,7 +389,7 @@ func startEndpointController(ctx context.Context, controllerCtx ControllerContex controllerCtx.InformerFactory.Core().V1().Endpoints(), controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"), controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, - ).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), controllerCtx.Stop) + ).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Done()) return nil, true, nil } @@ -399,7 +399,7 @@ func startReplicationController(ctx context.Context, controllerContext Controlle controllerContext.InformerFactory.Core().V1().ReplicationControllers(), controllerContext.ClientBuilder.ClientOrDie("replication-controller"), replicationcontroller.BurstReplicas, - ).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done()) return nil, true, nil } @@ -409,7 +409,7 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Nodes(), int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold), - ).Run(controllerContext.Stop) + ).Run(ctx.Done()) return nil, true, nil } @@ -441,10 +441,10 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control if err != nil { return nil, false, err } - go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), controllerContext.Stop) + go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Done()) // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, controllerContext.Stop) + go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) return nil, true, nil } @@ -457,10 +457,10 @@ func startNamespaceController(ctx context.Context, controllerContext ControllerC nsKubeconfig.QPS *= 20 nsKubeconfig.Burst *= 100 namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) - return startModifiedNamespaceController(controllerContext, namespaceKubeClient, nsKubeconfig) + return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig) } -func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) { +func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) { metadataClient, err := metadata.NewForConfig(nsKubeconfig) if err != nil { @@ -473,11 +473,11 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient namespaceKubeClient, metadataClient, discoverResourcesFn, - ctx.InformerFactory.Core().V1().Namespaces(), - ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, + controllerContext.InformerFactory.Core().V1().Namespaces(), + controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes, ) - go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop) + go namespaceController.Run(int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Done()) return nil, true, nil } @@ -492,7 +492,7 @@ func startServiceAccountController(ctx context.Context, controllerContext Contro if err != nil { return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err) } - go sac.Run(1, controllerContext.Stop) + go sac.Run(1, ctx.Done()) return nil, true, nil } @@ -500,7 +500,7 @@ func startTTLController(ctx context.Context, controllerContext ControllerContext go ttlcontroller.NewTTLController( controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.ClientBuilder.ClientOrDie("ttl-controller"), - ).Run(5, controllerContext.Stop) + ).Run(5, ctx.Done()) return nil, true, nil } @@ -539,11 +539,11 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont // Start the garbage collector. workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) - go garbageCollector.Run(workers, controllerContext.Stop) + go garbageCollector.Run(workers, ctx.Done()) // Periodically refresh the RESTMapper with new discovery information and sync // the garbage collector. - go garbageCollector.Sync(discoveryClient, 30*time.Second, controllerContext.Stop) + go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Done()) return garbageCollector, true, nil } @@ -559,7 +559,7 @@ func startPVCProtectionController(ctx context.Context, controllerContext Control if err != nil { return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err) } - go pvcProtectionController.Run(1, controllerContext.Stop) + go pvcProtectionController.Run(1, ctx.Done()) return nil, true, nil } @@ -568,7 +568,7 @@ func startPVProtectionController(ctx context.Context, controllerContext Controll controllerContext.InformerFactory.Core().V1().PersistentVolumes(), controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), - ).Run(1, controllerContext.Stop) + ).Run(1, ctx.Done()) return nil, true, nil } @@ -579,7 +579,7 @@ func startTTLAfterFinishedController(ctx context.Context, controllerContext Cont go ttlafterfinished.New( controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), - ).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Done()) return nil, true, nil } @@ -681,6 +681,6 @@ func startStorageVersionGCController(ctx context.Context, controllerContext Cont controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), controllerContext.InformerFactory.Coordination().V1().Leases(), controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(), - ).Run(controllerContext.Stop) + ).Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/core_test.go b/cmd/kube-controller-manager/app/core_test.go index c308a39a167..1e7513121d5 100644 --- a/cmd/kube-controller-manager/app/core_test.go +++ b/cmd/kube-controller-manager/app/core_test.go @@ -150,7 +150,7 @@ func TestController_DiscoveryError(t *testing.T) { } } _, _, err := startModifiedNamespaceController( - ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) + context.TODO(), ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) if test.expectedErr != (err != nil) { t.Errorf("Namespace Controller test failed for use case: %v", name) } diff --git a/cmd/kube-controller-manager/app/discovery.go b/cmd/kube-controller-manager/app/discovery.go index 7c9062836d7..52064c50c61 100644 --- a/cmd/kube-controller-manager/app/discovery.go +++ b/cmd/kube-controller-manager/app/discovery.go @@ -37,7 +37,7 @@ func startEndpointSliceController(ctx context.Context, controllerContext Control controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice, controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"), controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration, - ).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Done()) return nil, true, nil } @@ -49,6 +49,6 @@ func startEndpointSliceMirroringController(ctx context.Context, controllerContex controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset, controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"), controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration, - ).Run(int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), controllerContext.Stop) + ).Run(int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index 70af935315a..59d379ac333 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -58,6 +58,6 @@ func startDisruptionController(ctx context.Context, controllerContext Controller controllerContext.RESTMapper, scaleClient, client.Discovery(), - ).Run(controllerContext.Stop) + ).Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/rbac.go b/cmd/kube-controller-manager/app/rbac.go index cd5f94167eb..718be6955ef 100644 --- a/cmd/kube-controller-manager/app/rbac.go +++ b/cmd/kube-controller-manager/app/rbac.go @@ -31,6 +31,6 @@ func startClusterRoleAggregrationController(ctx context.Context, controllerConte go clusterroleaggregation.NewClusterRoleAggregation( controllerContext.InformerFactory.Rbac().V1().ClusterRoles(), controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), - ).Run(5, controllerContext.Stop) + ).Run(5, ctx.Done()) return nil, true, nil }