diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index 7b73b490928..99114cc2431 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -21,6 +21,7 @@ limitations under the License. package app import ( + "context" "fmt" "time" @@ -32,53 +33,53 @@ import ( "k8s.io/kubernetes/pkg/controller/statefulset" ) -func startDaemonSetController(ctx ControllerContext) (controller.Interface, bool, error) { +func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { dsc, err := daemon.NewDaemonSetsController( - ctx.InformerFactory.Apps().V1().DaemonSets(), - ctx.InformerFactory.Apps().V1().ControllerRevisions(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), + controllerContext.InformerFactory.Apps().V1().DaemonSets(), + controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"), flowcontrol.NewBackOff(1*time.Second, 15*time.Minute), ) if err != nil { return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err) } - go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop) + go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done()) return nil, true, nil } -func startStatefulSetController(ctx ControllerContext) (controller.Interface, bool, error) { +func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go statefulset.NewStatefulSetController( - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Apps().V1().StatefulSets(), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Apps().V1().ControllerRevisions(), - ctx.ClientBuilder.ClientOrDie("statefulset-controller"), - ).Run(int(ctx.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Stop) + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Apps().V1().StatefulSets(), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), + controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"), + ).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done()) return nil, true, nil } -func startReplicaSetController(ctx ControllerContext) (controller.Interface, bool, error) { +func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go replicaset.NewReplicaSetController( - ctx.InformerFactory.Apps().V1().ReplicaSets(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.ClientBuilder.ClientOrDie("replicaset-controller"), + controllerContext.InformerFactory.Apps().V1().ReplicaSets(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, - ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) + ).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done()) return nil, true, nil } -func startDeploymentController(ctx ControllerContext) (controller.Interface, bool, error) { +func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { dc, err := deployment.NewDeploymentController( - ctx.InformerFactory.Apps().V1().Deployments(), - ctx.InformerFactory.Apps().V1().ReplicaSets(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.ClientBuilder.ClientOrDie("deployment-controller"), + controllerContext.InformerFactory.Apps().V1().Deployments(), + controllerContext.InformerFactory.Apps().V1().ReplicaSets(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.ClientBuilder.ClientOrDie("deployment-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) } - go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop) + go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index cedab579c04..53bca1109cf 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -21,6 +21,8 @@ limitations under the License. package app import ( + "context" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/scale" @@ -33,42 +35,42 @@ import ( "k8s.io/metrics/pkg/client/external_metrics" ) -func startHPAController(ctx ControllerContext) (controller.Interface, bool, error) { - if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { +func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] { return nil, false, nil } - return startHPAControllerWithRESTClient(ctx) + return startHPAControllerWithRESTClient(ctx, controllerContext) } -func startHPAControllerWithRESTClient(ctx ControllerContext) (controller.Interface, bool, error) { - clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") - hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") +func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") + hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery()) // invalidate the discovery information roughly once per resync interval our API // information is *at most* two resync intervals old. go custom_metrics.PeriodicallyInvalidate( apiVersionsGetter, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, - ctx.Stop) + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, + ctx.Done()) metricsClient := metrics.NewRESTMetricsClient( resourceclient.NewForConfigOrDie(clientConfig), - custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter), + custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter), external_metrics.NewForConfigOrDie(clientConfig), ) - return startHPAControllerWithMetricsClient(ctx, metricsClient) + return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient) } -func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) { - hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") - hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") +func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) { + hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") + hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching, // so we want to re-fetch every time when we actually ask for it scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) - scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return nil, false, err } @@ -77,15 +79,15 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me hpaClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), - ctx.RESTMapper, + controllerContext.RESTMapper, metricsClient, - ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, - ).Run(ctx.Stop) + controllerContext.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, + controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, + ).Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index 1b649b809a3..fa363a4ce4e 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -21,6 +21,7 @@ limitations under the License. package app import ( + "context" "fmt" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -30,33 +31,33 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" ) -func startJobController(ctx ControllerContext) (controller.Interface, bool, error) { +func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go job.NewController( - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Batch().V1().Jobs(), - ctx.ClientBuilder.ClientOrDie("job-controller"), - ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop) + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Batch().V1().Jobs(), + controllerContext.ClientBuilder.ClientOrDie("job-controller"), + ).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done()) return nil, true, nil } -func startCronJobController(ctx ControllerContext) (controller.Interface, bool, error) { +func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) { - cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(), - ctx.InformerFactory.Batch().V1().CronJobs(), - ctx.ClientBuilder.ClientOrDie("cronjob-controller"), + cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(), + controllerContext.InformerFactory.Batch().V1().CronJobs(), + controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) } - go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop) + go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Done()) return nil, true, nil } cjc, err := cronjob.NewController( - ctx.ClientBuilder.ClientOrDie("cronjob-controller"), + controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller: %v", err) } - go cjc.Run(ctx.Stop) + go cjc.Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/bootstrap.go b/cmd/kube-controller-manager/app/bootstrap.go index 5b1adbccfff..2dbc0b160c8 100644 --- a/cmd/kube-controller-manager/app/bootstrap.go +++ b/cmd/kube-controller-manager/app/bootstrap.go @@ -17,35 +17,36 @@ limitations under the License. package app import ( + "context" "fmt" "k8s.io/controller-manager/controller" "k8s.io/kubernetes/pkg/controller/bootstrap" ) -func startBootstrapSignerController(ctx ControllerContext) (controller.Interface, bool, error) { +func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { bsc, err := bootstrap.NewSigner( - ctx.ClientBuilder.ClientOrDie("bootstrap-signer"), - ctx.InformerFactory.Core().V1().Secrets(), - ctx.InformerFactory.Core().V1().ConfigMaps(), + controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"), + controllerContext.InformerFactory.Core().V1().Secrets(), + controllerContext.InformerFactory.Core().V1().ConfigMaps(), bootstrap.DefaultSignerOptions(), ) if err != nil { return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err) } - go bsc.Run(ctx.Stop) + go bsc.Run(ctx.Done()) return nil, true, nil } -func startTokenCleanerController(ctx ControllerContext) (controller.Interface, bool, error) { +func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { tcc, err := bootstrap.NewTokenCleaner( - ctx.ClientBuilder.ClientOrDie("token-cleaner"), - ctx.InformerFactory.Core().V1().Secrets(), + controllerContext.ClientBuilder.ClientOrDie("token-cleaner"), + controllerContext.InformerFactory.Core().V1().Secrets(), bootstrap.DefaultTokenCleanerOptions(), ) if err != nil { return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err) } - go tcc.Run(ctx.Stop) + go tcc.Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index 144266bdb47..ca3e6c9f1a9 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -21,6 +21,7 @@ limitations under the License. package app import ( + "context" "fmt" "k8s.io/controller-manager/controller" @@ -32,56 +33,56 @@ import ( csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config" ) -func startCSRSigningController(ctx ControllerContext) (controller.Interface, bool, error) { - missingSingleSigningFile := ctx.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || ctx.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == "" - if missingSingleSigningFile && !anySpecificFilesSet(ctx.ComponentConfig.CSRSigningController) { +func startCSRSigningController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == "" + if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) { klog.V(2).Info("skipping CSR signer controller because no csr cert/key was specified") return nil, false, nil } - if !missingSingleSigningFile && anySpecificFilesSet(ctx.ComponentConfig.CSRSigningController) { + if !missingSingleSigningFile && anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) { return nil, false, fmt.Errorf("cannot specify default and per controller certs at the same time") } - c := ctx.ClientBuilder.ClientOrDie("certificate-controller") - csrInformer := ctx.InformerFactory.Certificates().V1().CertificateSigningRequests() - certTTL := ctx.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration + c := controllerContext.ClientBuilder.ClientOrDie("certificate-controller") + csrInformer := controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests() + certTTL := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration - if kubeletServingSignerCertFile, kubeletServingSignerKeyFile := getKubeletServingSignerFiles(ctx.ComponentConfig.CSRSigningController); len(kubeletServingSignerCertFile) > 0 || len(kubeletServingSignerKeyFile) > 0 { + if kubeletServingSignerCertFile, kubeletServingSignerKeyFile := getKubeletServingSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletServingSignerCertFile) > 0 || len(kubeletServingSignerKeyFile) > 0 { kubeletServingSigner, err := signer.NewKubeletServingCSRSigningController(c, csrInformer, kubeletServingSignerCertFile, kubeletServingSignerKeyFile, certTTL) if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) } - go kubeletServingSigner.Run(5, ctx.Stop) + go kubeletServingSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving") } - if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(ctx.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 { + if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 { kubeletClientSigner, err := signer.NewKubeletClientCSRSigningController(c, csrInformer, kubeletClientSignerCertFile, kubeletClientSignerKeyFile, certTTL) if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) } - go kubeletClientSigner.Run(5, ctx.Stop) + go kubeletClientSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet") } - if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(ctx.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 { + if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 { kubeAPIServerClientSigner, err := signer.NewKubeAPIServerClientCSRSigningController(c, csrInformer, kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile, certTTL) if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) } - go kubeAPIServerClientSigner.Run(5, ctx.Stop) + go kubeAPIServerClientSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client") } - if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(ctx.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 { + if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 { legacyUnknownSigner, err := signer.NewLegacyUnknownCSRSigningController(c, csrInformer, legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile, certTTL) if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) } - go legacyUnknownSigner.Run(5, ctx.Stop) + go legacyUnknownSigner.Run(5, ctx.Done()) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown") } @@ -147,47 +148,47 @@ func getLegacyUnknownSignerFiles(config csrsigningconfig.CSRSigningControllerCon return config.ClusterSigningCertFile, config.ClusterSigningKeyFile } -func startCSRApprovingController(ctx ControllerContext) (controller.Interface, bool, error) { +func startCSRApprovingController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { approver := approver.NewCSRApprovingController( - ctx.ClientBuilder.ClientOrDie("certificate-controller"), - ctx.InformerFactory.Certificates().V1().CertificateSigningRequests(), + controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), + controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go approver.Run(5, ctx.Stop) + go approver.Run(5, ctx.Done()) return nil, true, nil } -func startCSRCleanerController(ctx ControllerContext) (controller.Interface, bool, error) { +func startCSRCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { cleaner := cleaner.NewCSRCleanerController( - ctx.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), - ctx.InformerFactory.Certificates().V1().CertificateSigningRequests(), + controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), + controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go cleaner.Run(1, ctx.Stop) + go cleaner.Run(1, ctx.Done()) return nil, true, nil } -func startRootCACertPublisher(ctx ControllerContext) (controller.Interface, bool, error) { +func startRootCACertPublisher(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { var ( rootCA []byte err error ) - if ctx.ComponentConfig.SAController.RootCAFile != "" { - if rootCA, err = readCA(ctx.ComponentConfig.SAController.RootCAFile); err != nil { - return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err) + if controllerContext.ComponentConfig.SAController.RootCAFile != "" { + if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil { + return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err) } } else { - rootCA = ctx.ClientBuilder.ConfigOrDie("root-ca-cert-publisher").CAData + rootCA = controllerContext.ClientBuilder.ConfigOrDie("root-ca-cert-publisher").CAData } sac, err := rootcacertpublisher.NewPublisher( - ctx.InformerFactory.Core().V1().ConfigMaps(), - ctx.InformerFactory.Core().V1().Namespaces(), - ctx.ClientBuilder.ClientOrDie("root-ca-cert-publisher"), + controllerContext.InformerFactory.Core().V1().ConfigMaps(), + controllerContext.InformerFactory.Core().V1().Namespaces(), + controllerContext.ClientBuilder.ClientOrDie("root-ca-cert-publisher"), rootCA, ) if err != nil { return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) } - go sac.Run(1, ctx.Stop) + go sac.Run(1, ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7fbab29261a..8ec7afe0758 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -225,12 +225,12 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { klog.Fatalf("error building controller context: %v", err) } controllerInitializers := initializersFunc(controllerContext.LoopMode) - if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil { + if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil { klog.Fatalf("error starting controllers: %v", err) } - controllerContext.InformerFactory.Start(controllerContext.Stop) - controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop) + controllerContext.InformerFactory.Start(stopCh) + controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh) close(controllerContext.InformersStarted) select {} @@ -265,9 +265,9 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // Wrap saTokenControllerInitFunc to signal readiness for migration after starting // the controller. - startSATokenController = func(ctx ControllerContext) (controller.Interface, bool, error) { + startSATokenController = func(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { defer close(leaderMigrator.MigrationReady) - return saTokenControllerInitFunc(ctx) + return saTokenControllerInitFunc(ctx, controllerContext) } } @@ -352,9 +352,6 @@ type ControllerContext struct { // ExternalLoops is for a kube-controller-manager running with a cloud-controller-manager LoopMode ControllerLoopMode - // Stop is the stop channel - Stop <-chan struct{} - // InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe, // for an individual controller to start the shared informers. Before it is closed, they should not. InformersStarted chan struct{} @@ -377,7 +374,7 @@ func (c ControllerContext) IsControllerEnabled(name string) bool { // that requests no additional features from the controller manager. // Any error returned will cause the controller process to `Fatal` // The bool indicates whether the controller was enabled. -type InitFunc func(ctx ControllerContext) (controller controller.Interface, enabled bool, err error) +type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error) // ControllerInitializersFunc is used to create a collection of initializers // given the loopMode. @@ -535,7 +532,6 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien AvailableResources: availableResources, Cloud: cloud, LoopMode: loopMode, - Stop: stop, InformersStarted: make(chan struct{}), ResyncPeriod: ResyncPeriod(s), } @@ -543,34 +539,34 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien } // StartControllers starts a set of controllers with a specified ControllerContext -func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, +func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error { // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // If this fails, just return here and fail since other controllers won't be able to get credentials. if startSATokenController != nil { - if _, _, err := startSATokenController(ctx); err != nil { + if _, _, err := startSATokenController(ctx, controllerCtx); err != nil { return err } } // Initialize the cloud provider with a reference to the clientBuilder only after token controller // has started in case the cloud provider uses the client builder. - if ctx.Cloud != nil { - ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop) + if controllerCtx.Cloud != nil { + controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done()) } var controllerChecks []healthz.HealthChecker for controllerName, initFn := range controllers { - if !ctx.IsControllerEnabled(controllerName) { + if !controllerCtx.IsControllerEnabled(controllerName) { klog.Warningf("%q is disabled", controllerName) continue } - time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) + time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) klog.V(1).Infof("Starting %q", controllerName) - ctrl, started, err := initFn(ctx) + ctrl, started, err := initFn(ctx, controllerCtx) if err != nil { klog.Errorf("Error starting %q", controllerName) return err @@ -613,25 +609,25 @@ type serviceAccountTokenControllerStarter struct { rootClientBuilder clientbuilder.ControllerClientBuilder } -func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (controller.Interface, bool, error) { - if !ctx.IsControllerEnabled(saTokenControllerName) { +func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if !controllerContext.IsControllerEnabled(saTokenControllerName) { klog.Warningf("%q is disabled", saTokenControllerName) return nil, false, nil } - if len(ctx.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 { + if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 { klog.Warningf("%q is disabled because there is no private key", saTokenControllerName) return nil, false, nil } - privateKey, err := keyutil.PrivateKeyFromFile(ctx.ComponentConfig.SAController.ServiceAccountKeyFile) + privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) if err != nil { return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err) } var rootCA []byte - if ctx.ComponentConfig.SAController.RootCAFile != "" { - if rootCA, err = readCA(ctx.ComponentConfig.SAController.RootCAFile); err != nil { - return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err) + if controllerContext.ComponentConfig.SAController.RootCAFile != "" { + if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil { + return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err) } } else { rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData @@ -642,8 +638,8 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController return nil, false, fmt.Errorf("failed to build token generator: %v", err) } controller, err := serviceaccountcontroller.NewTokensController( - ctx.InformerFactory.Core().V1().ServiceAccounts(), - ctx.InformerFactory.Core().V1().Secrets(), + controllerContext.InformerFactory.Core().V1().ServiceAccounts(), + controllerContext.InformerFactory.Core().V1().Secrets(), c.rootClientBuilder.ClientOrDie("tokens-controller"), serviceaccountcontroller.TokensControllerOptions{ TokenGenerator: tokenGenerator, @@ -653,10 +649,10 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController if err != nil { return nil, true, fmt.Errorf("error creating Tokens controller: %v", err) } - go controller.Run(int(ctx.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Stop) + go controller.Run(int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Done()) // start the first set of informers now so that other controllers can start - ctx.InformerFactory.Start(ctx.Stop) + controllerContext.InformerFactory.Start(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index b49e7c1f215..cd217818c4a 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -21,6 +21,7 @@ limitations under the License. package app import ( + "context" "errors" "fmt" "net" @@ -77,13 +78,13 @@ const ( defaultNodeMaskCIDRIPv6 = 64 ) -func startServiceController(ctx ControllerContext) (controller.Interface, bool, error) { +func startServiceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { serviceController, err := servicecontroller.New( - ctx.Cloud, - ctx.ClientBuilder.ClientOrDie("service-controller"), - ctx.InformerFactory.Core().V1().Services(), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.ComponentConfig.KubeCloudShared.ClusterName, + controllerContext.Cloud, + controllerContext.ClientBuilder.ClientOrDie("service-controller"), + controllerContext.InformerFactory.Core().V1().Services(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.ComponentConfig.KubeCloudShared.ClusterName, utilfeature.DefaultFeatureGate, ) if err != nil { @@ -91,21 +92,21 @@ func startServiceController(ctx ControllerContext) (controller.Interface, bool, klog.Errorf("Failed to start service controller: %v", err) return nil, false, nil } - go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) + go serviceController.Run(ctx.Done(), int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) return nil, true, nil } -func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool, error) { +func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { var serviceCIDR *net.IPNet var secondaryServiceCIDR *net.IPNet // should we start nodeIPAM - if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs { + if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs { return nil, false, nil } // failure: bad cidrs in config - clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) + clusterCIDRs, dualStack, err := processCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR) if err != nil { return nil, false, err } @@ -121,17 +122,17 @@ func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool, } // service cidr processing - if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 { - _, serviceCIDR, err = netutils.ParseCIDRSloppy(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR) + if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 { + _, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR) if err != nil { - klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.ServiceCIDR, err) + klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR, err) } } - if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 { - _, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR) + if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 { + _, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR) if err != nil { - klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err) + klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err) } } @@ -149,60 +150,60 @@ func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool, // only --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 supported with dual stack clusters. // --node-cidr-mask-size flag is incompatible with dual stack clusters. - nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(ctx.ComponentConfig.NodeIPAMController, clusterCIDRs) + nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(controllerContext.ComponentConfig.NodeIPAMController, clusterCIDRs) if err != nil { return nil, false, err } nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( - ctx.InformerFactory.Core().V1().Nodes(), - ctx.Cloud, - ctx.ClientBuilder.ClientOrDie("node-controller"), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.Cloud, + controllerContext.ClientBuilder.ClientOrDie("node-controller"), clusterCIDRs, serviceCIDR, secondaryServiceCIDR, nodeCIDRMaskSizes, - ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType), + ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType), ) if err != nil { return nil, true, err } - go nodeIpamController.Run(ctx.Stop) + go nodeIpamController.Run(ctx.Done()) return nil, true, nil } -func startNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) { +func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController( - ctx.InformerFactory.Coordination().V1().Leases(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Apps().V1().DaemonSets(), + controllerContext.InformerFactory.Coordination().V1().Leases(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.InformerFactory.Apps().V1().DaemonSets(), // node lifecycle controller uses existing cluster role from node-controller - ctx.ClientBuilder.ClientOrDie("node-controller"), - ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, - ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration, - ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration, - ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration, - ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate, - ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate, - ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold, - ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold, - ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager, + controllerContext.ClientBuilder.ClientOrDie("node-controller"), + controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, + controllerContext.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration, + controllerContext.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration, + controllerContext.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration, + controllerContext.ComponentConfig.NodeLifecycleController.NodeEvictionRate, + controllerContext.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate, + controllerContext.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold, + controllerContext.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold, + controllerContext.ComponentConfig.NodeLifecycleController.EnableTaintManager, ) if err != nil { return nil, true, err } - go lifecycleController.Run(ctx.Stop) + go lifecycleController.Run(ctx.Done()) return nil, true, nil } -func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) { +func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController( - ctx.InformerFactory.Core().V1().Nodes(), + controllerContext.InformerFactory.Core().V1().Nodes(), // cloud node lifecycle controller uses existing cluster role from node-controller - ctx.ClientBuilder.ClientOrDie("node-controller"), - ctx.Cloud, - ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, + controllerContext.ClientBuilder.ClientOrDie("node-controller"), + controllerContext.Cloud, + controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, ) if err != nil { // the controller manager should continue to run if the "Instances" interface is not @@ -211,27 +212,27 @@ func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interf return nil, false, nil } - go cloudNodeLifecycleController.Run(ctx.Stop) + go cloudNodeLifecycleController.Run(ctx.Done()) return nil, true, nil } -func startRouteController(ctx ControllerContext) (controller.Interface, bool, error) { - if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { - klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) +func startRouteController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { + klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) return nil, false, nil } - if ctx.Cloud == nil { + if controllerContext.Cloud == nil { klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.") return nil, false, nil } - routes, ok := ctx.Cloud.Routes() + routes, ok := controllerContext.Cloud.Routes() if !ok { klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") return nil, false, nil } // failure: bad cidrs in config - clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) + clusterCIDRs, dualStack, err := processCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR) if err != nil { return nil, false, err } @@ -247,54 +248,54 @@ func startRouteController(ctx ControllerContext) (controller.Interface, bool, er } routeController := routecontroller.New(routes, - ctx.ClientBuilder.ClientOrDie("route-controller"), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.ComponentConfig.KubeCloudShared.ClusterName, + controllerContext.ClientBuilder.ClientOrDie("route-controller"), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDRs) - go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) + go routeController.Run(ctx.Done(), controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) return nil, true, nil } -func startPersistentVolumeBinderController(ctx ControllerContext) (controller.Interface, bool, error) { - plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) +func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + plugins, err := ProbeControllerVolumePlugins(controllerContext.Cloud, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) if err != nil { return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err) } filteredDialOptions, err := options.ParseVolumeHostFilters( - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) if err != nil { return nil, true, err } params := persistentvolumecontroller.ControllerParameters{ - KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"), - SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration, + KubeClient: controllerContext.ClientBuilder.ClientOrDie("persistent-volume-binder"), + SyncPeriod: controllerContext.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration, VolumePlugins: plugins, - Cloud: ctx.Cloud, - ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName, - VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(), - ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(), - PodInformer: ctx.InformerFactory.Core().V1().Pods(), - NodeInformer: ctx.InformerFactory.Core().V1().Nodes(), - EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning, + Cloud: controllerContext.Cloud, + ClusterName: controllerContext.ComponentConfig.KubeCloudShared.ClusterName, + VolumeInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumes(), + ClaimInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + ClassInformer: controllerContext.InformerFactory.Storage().V1().StorageClasses(), + PodInformer: controllerContext.InformerFactory.Core().V1().Pods(), + NodeInformer: controllerContext.InformerFactory.Core().V1().Nodes(), + EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning, FilteredDialOptions: filteredDialOptions, } volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params) if volumeControllerErr != nil { return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr) } - go volumeController.Run(ctx.Stop) + go volumeController.Run(ctx.Done()) return nil, true, nil } -func startAttachDetachController(ctx ControllerContext) (controller.Interface, bool, error) { - if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { +func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period") } - csiNodeInformer := ctx.InformerFactory.Storage().V1().CSINodes() - csiDriverInformer := ctx.InformerFactory.Storage().V1().CSIDrivers() + csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes() + csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers() plugins, err := ProbeAttachableVolumePlugins() if err != nil { @@ -302,55 +303,55 @@ func startAttachDetachController(ctx ControllerContext) (controller.Interface, b } filteredDialOptions, err := options.ParseVolumeHostFilters( - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) if err != nil { return nil, true, err } attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( - ctx.ClientBuilder.ClientOrDie("attachdetach-controller"), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Core().V1().PersistentVolumes(), + controllerContext.ClientBuilder.ClientOrDie("attachdetach-controller"), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), csiNodeInformer, csiDriverInformer, - ctx.InformerFactory.Storage().V1().VolumeAttachments(), - ctx.Cloud, + controllerContext.InformerFactory.Storage().V1().VolumeAttachments(), + controllerContext.Cloud, plugins, - GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), - ctx.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, - ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, + GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), + controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, + controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, attachdetach.DefaultTimerConfig, filteredDialOptions, ) if attachDetachControllerErr != nil { return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) } - go attachDetachController.Run(ctx.Stop) + go attachDetachController.Run(ctx.Done()) return nil, true, nil } -func startVolumeExpandController(ctx ControllerContext) (controller.Interface, bool, error) { +func startVolumeExpandController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { - plugins, err := ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) + plugins, err := ProbeExpandableVolumePlugins(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) if err != nil { return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err) } csiTranslator := csitrans.New() filteredDialOptions, err := options.ParseVolumeHostFilters( - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, - ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, + controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) if err != nil { return nil, true, err } expandController, expandControllerErr := expand.NewExpandController( - ctx.ClientBuilder.ClientOrDie("expand-controller"), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Core().V1().PersistentVolumes(), - ctx.Cloud, + controllerContext.ClientBuilder.ClientOrDie("expand-controller"), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), + controllerContext.Cloud, plugins, csiTranslator, csimigration.NewPluginManager(csiTranslator, utilfeature.DefaultFeatureGate), @@ -360,74 +361,74 @@ func startVolumeExpandController(ctx ControllerContext) (controller.Interface, b if expandControllerErr != nil { return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr) } - go expandController.Run(ctx.Stop) + go expandController.Run(ctx.Done()) return nil, true, nil } return nil, false, nil } -func startEphemeralVolumeController(ctx ControllerContext) (controller.Interface, bool, error) { +func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { ephemeralController, err := ephemeral.NewController( - ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims()) + controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims()) if err != nil { return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) } - go ephemeralController.Run(int(ctx.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Stop) + go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Done()) return nil, true, nil } return nil, false, nil } -func startEndpointController(ctx ControllerContext) (controller.Interface, bool, error) { +func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) { go endpointcontroller.NewEndpointController( - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Services(), - ctx.InformerFactory.Core().V1().Endpoints(), - ctx.ClientBuilder.ClientOrDie("endpoint-controller"), - ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, - ).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop) + controllerCtx.InformerFactory.Core().V1().Pods(), + controllerCtx.InformerFactory.Core().V1().Services(), + controllerCtx.InformerFactory.Core().V1().Endpoints(), + controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"), + controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, + ).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Done()) return nil, true, nil } -func startReplicationController(ctx ControllerContext) (controller.Interface, bool, error) { +func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go replicationcontroller.NewReplicationManager( - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().ReplicationControllers(), - ctx.ClientBuilder.ClientOrDie("replication-controller"), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().ReplicationControllers(), + controllerContext.ClientBuilder.ClientOrDie("replication-controller"), replicationcontroller.BurstReplicas, - ).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop) + ).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done()) return nil, true, nil } -func startPodGCController(ctx ControllerContext) (controller.Interface, bool, error) { +func startPodGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go podgc.NewPodGC( - ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Nodes(), - int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold), - ).Run(ctx.Stop) + controllerContext.ClientBuilder.ClientOrDie("pod-garbage-collector"), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Nodes(), + int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold), + ).Run(ctx.Done()) return nil, true, nil } -func startResourceQuotaController(ctx ControllerContext) (controller.Interface, bool, error) { - resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") - resourceQuotaControllerDiscoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller") +func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller") + resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller") discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources - listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource) + listerFuncForResource := generic.ListerFuncForResourceFunc(controllerContext.InformerFactory.ForResource) quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource) resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{ QuotaClient: resourceQuotaControllerClient.CoreV1(), - ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(), - ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration), - InformerFactory: ctx.ObjectOrMetadataInformerFactory, - ReplenishmentResyncPeriod: ctx.ResyncPeriod, + ResourceQuotaInformer: controllerContext.InformerFactory.Core().V1().ResourceQuotas(), + ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(controllerContext.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration), + InformerFactory: controllerContext.ObjectOrMetadataInformerFactory, + ReplenishmentResyncPeriod: controllerContext.ResyncPeriod, DiscoveryFunc: discoveryFunc, IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, - InformersStarted: ctx.InformersStarted, + InformersStarted: controllerContext.InformersStarted, Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), } if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -440,26 +441,26 @@ func startResourceQuotaController(ctx ControllerContext) (controller.Interface, if err != nil { return nil, false, err } - go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop) + go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Done()) // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop) + go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) return nil, true, nil } -func startNamespaceController(ctx ControllerContext) (controller.Interface, bool, error) { +func startNamespaceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { // the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls // the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource // including events), takes ~10 seconds by default. - nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller") + nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller") nsKubeconfig.QPS *= 20 nsKubeconfig.Burst *= 100 namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) - return startModifiedNamespaceController(ctx, namespaceKubeClient, nsKubeconfig) + return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig) } -func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) { +func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) { metadataClient, err := metadata.NewForConfig(nsKubeconfig) if err != nil { @@ -472,46 +473,46 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient namespaceKubeClient, metadataClient, discoverResourcesFn, - ctx.InformerFactory.Core().V1().Namespaces(), - ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, + controllerContext.InformerFactory.Core().V1().Namespaces(), + controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes, ) - go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop) + go namespaceController.Run(int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Done()) return nil, true, nil } -func startServiceAccountController(ctx ControllerContext) (controller.Interface, bool, error) { +func startServiceAccountController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { sac, err := serviceaccountcontroller.NewServiceAccountsController( - ctx.InformerFactory.Core().V1().ServiceAccounts(), - ctx.InformerFactory.Core().V1().Namespaces(), - ctx.ClientBuilder.ClientOrDie("service-account-controller"), + controllerContext.InformerFactory.Core().V1().ServiceAccounts(), + controllerContext.InformerFactory.Core().V1().Namespaces(), + controllerContext.ClientBuilder.ClientOrDie("service-account-controller"), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ) if err != nil { return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err) } - go sac.Run(1, ctx.Stop) + go sac.Run(1, ctx.Done()) return nil, true, nil } -func startTTLController(ctx ControllerContext) (controller.Interface, bool, error) { +func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go ttlcontroller.NewTTLController( - ctx.InformerFactory.Core().V1().Nodes(), - ctx.ClientBuilder.ClientOrDie("ttl-controller"), - ).Run(5, ctx.Stop) + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.ClientBuilder.ClientOrDie("ttl-controller"), + ).Run(5, ctx.Done()) return nil, true, nil } -func startGarbageCollectorController(ctx ControllerContext) (controller.Interface, bool, error) { - if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { +func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { return nil, false, nil } - gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector") - discoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector") + gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector") + discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector") - config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") + config := controllerContext.ClientBuilder.ConfigOrDie("generic-garbage-collector") // Increase garbage collector controller's throughput: each object deletion takes two API calls, // so to get |config.QPS| deletion rate we need to allow 2x more requests for this controller. config.QPS *= 2 @@ -521,64 +522,64 @@ func startGarbageCollectorController(ctx ControllerContext) (controller.Interfac } ignoredResources := make(map[schema.GroupResource]struct{}) - for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources { + for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources { ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{} } garbageCollector, err := garbagecollector.NewGarbageCollector( gcClientset, metadataClient, - ctx.RESTMapper, + controllerContext.RESTMapper, ignoredResources, - ctx.ObjectOrMetadataInformerFactory, - ctx.InformersStarted, + controllerContext.ObjectOrMetadataInformerFactory, + controllerContext.InformersStarted, ) if err != nil { return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err) } // Start the garbage collector. - workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) - go garbageCollector.Run(workers, ctx.Stop) + workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) + go garbageCollector.Run(workers, ctx.Done()) // Periodically refresh the RESTMapper with new discovery information and sync // the garbage collector. - go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Stop) + go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Done()) return garbageCollector, true, nil } -func startPVCProtectionController(ctx ControllerContext) (controller.Interface, bool, error) { +func startPVCProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { pvcProtectionController, err := pvcprotection.NewPVCProtectionController( - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.ClientBuilder.ClientOrDie("pvc-protection-controller"), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume), ) if err != nil { return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err) } - go pvcProtectionController.Run(1, ctx.Stop) + go pvcProtectionController.Run(1, ctx.Done()) return nil, true, nil } -func startPVProtectionController(ctx ControllerContext) (controller.Interface, bool, error) { +func startPVProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go pvprotection.NewPVProtectionController( - ctx.InformerFactory.Core().V1().PersistentVolumes(), - ctx.ClientBuilder.ClientOrDie("pv-protection-controller"), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), + controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"), utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), - ).Run(1, ctx.Stop) + ).Run(1, ctx.Done()) return nil, true, nil } -func startTTLAfterFinishedController(ctx ControllerContext) (controller.Interface, bool, error) { +func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) { return nil, false, nil } go ttlafterfinished.New( - ctx.InformerFactory.Batch().V1().Jobs(), - ctx.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), - ).Run(int(ctx.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Stop) + controllerContext.InformerFactory.Batch().V1().Jobs(), + controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), + ).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Done()) return nil, true, nil } @@ -674,11 +675,12 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl } return sortedSizes(ipv4Mask, ipv6Mask), nil } -func startStorageVersionGCController(ctx ControllerContext) (controller.Interface, bool, error) { + +func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go storageversiongc.NewStorageVersionGC( - ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), - ctx.InformerFactory.Coordination().V1().Leases(), - ctx.InformerFactory.Internal().V1alpha1().StorageVersions(), - ).Run(ctx.Stop) + controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), + controllerContext.InformerFactory.Coordination().V1().Leases(), + controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(), + ).Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/core_test.go b/cmd/kube-controller-manager/app/core_test.go index 6415e83ef67..1e7513121d5 100644 --- a/cmd/kube-controller-manager/app/core_test.go +++ b/cmd/kube-controller-manager/app/core_test.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "context" "testing" "time" @@ -104,7 +105,7 @@ func possibleDiscoveryResource() []*metav1.APIResourceList { } } -type controllerInitFunc func(ControllerContext) (controller.Interface, bool, error) +type controllerInitFunc func(context.Context, ControllerContext) (controller.Interface, bool, error) func TestController_DiscoveryError(t *testing.T) { controllerInitFuncMap := map[string]controllerInitFunc{ @@ -143,13 +144,13 @@ func TestController_DiscoveryError(t *testing.T) { InformersStarted: make(chan struct{}), } for funcName, controllerInit := range controllerInitFuncMap { - _, _, err := controllerInit(ctx) + _, _, err := controllerInit(context.TODO(), ctx) if test.expectedErr != (err != nil) { t.Errorf("%v test failed for use case: %v", funcName, name) } } _, _, err := startModifiedNamespaceController( - ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) + context.TODO(), ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller")) if test.expectedErr != (err != nil) { t.Errorf("Namespace Controller test failed for use case: %v", name) } diff --git a/cmd/kube-controller-manager/app/discovery.go b/cmd/kube-controller-manager/app/discovery.go index ad9129a9b9d..52064c50c61 100644 --- a/cmd/kube-controller-manager/app/discovery.go +++ b/cmd/kube-controller-manager/app/discovery.go @@ -21,32 +21,34 @@ limitations under the License. package app import ( + "context" + "k8s.io/controller-manager/controller" endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice" endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring" ) -func startEndpointSliceController(ctx ControllerContext) (controller.Interface, bool, error) { +func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go endpointslicecontroller.NewController( - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().Services(), - ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Discovery().V1().EndpointSlices(), - ctx.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice, - ctx.ClientBuilder.ClientOrDie("endpointslice-controller"), - ctx.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration, - ).Run(int(ctx.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Stop) + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Core().V1().Services(), + controllerContext.InformerFactory.Core().V1().Nodes(), + controllerContext.InformerFactory.Discovery().V1().EndpointSlices(), + controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice, + controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"), + controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration, + ).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Done()) return nil, true, nil } -func startEndpointSliceMirroringController(ctx ControllerContext) (controller.Interface, bool, error) { +func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go endpointslicemirroringcontroller.NewController( - ctx.InformerFactory.Core().V1().Endpoints(), - ctx.InformerFactory.Discovery().V1().EndpointSlices(), - ctx.InformerFactory.Core().V1().Services(), - ctx.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset, - ctx.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"), - ctx.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration, - ).Run(int(ctx.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), ctx.Stop) + controllerContext.InformerFactory.Core().V1().Endpoints(), + controllerContext.InformerFactory.Discovery().V1().EndpointSlices(), + controllerContext.InformerFactory.Core().V1().Services(), + controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset, + controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"), + controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration, + ).Run(int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index 3f16e2ac5f2..59d379ac333 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -21,6 +21,8 @@ limitations under the License. package app import ( + "context" + "k8s.io/klog/v2" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -31,31 +33,31 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" ) -func startDisruptionController(ctx ControllerContext) (controller.Interface, bool, error) { +func startDisruptionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { klog.InfoS("Refusing to start disruption because the PodDisruptionBudget feature is disabled") return nil, false, nil } - client := ctx.ClientBuilder.ClientOrDie("disruption-controller") - config := ctx.ClientBuilder.ConfigOrDie("disruption-controller") + client := controllerContext.ClientBuilder.ClientOrDie("disruption-controller") + config := controllerContext.ClientBuilder.ConfigOrDie("disruption-controller") scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery()) - scaleClient, err := scale.NewForConfig(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + scaleClient, err := scale.NewForConfig(config, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return nil, false, err } go disruption.NewDisruptionController( - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Policy().V1().PodDisruptionBudgets(), - ctx.InformerFactory.Core().V1().ReplicationControllers(), - ctx.InformerFactory.Apps().V1().ReplicaSets(), - ctx.InformerFactory.Apps().V1().Deployments(), - ctx.InformerFactory.Apps().V1().StatefulSets(), + controllerContext.InformerFactory.Core().V1().Pods(), + controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(), + controllerContext.InformerFactory.Core().V1().ReplicationControllers(), + controllerContext.InformerFactory.Apps().V1().ReplicaSets(), + controllerContext.InformerFactory.Apps().V1().Deployments(), + controllerContext.InformerFactory.Apps().V1().StatefulSets(), client, - ctx.RESTMapper, + controllerContext.RESTMapper, scaleClient, client.Discovery(), - ).Run(ctx.Stop) + ).Run(ctx.Done()) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/rbac.go b/cmd/kube-controller-manager/app/rbac.go index 579baeeee50..718be6955ef 100644 --- a/cmd/kube-controller-manager/app/rbac.go +++ b/cmd/kube-controller-manager/app/rbac.go @@ -17,18 +17,20 @@ limitations under the License. package app import ( + "context" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/controller-manager/controller" "k8s.io/kubernetes/pkg/controller/clusterroleaggregation" ) -func startClusterRoleAggregrationController(ctx ControllerContext) (controller.Interface, bool, error) { - if !ctx.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] { +func startClusterRoleAggregrationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] { return nil, false, nil } go clusterroleaggregation.NewClusterRoleAggregation( - ctx.InformerFactory.Rbac().V1().ClusterRoles(), - ctx.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), - ).Run(5, ctx.Stop) + controllerContext.InformerFactory.Rbac().V1().ClusterRoles(), + controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), + ).Run(5, ctx.Done()) return nil, true, nil }