Merge pull request #101125 from damemi/kcm-wire-contexts

Set up kube-controller-manager functions to accept contexts
This commit is contained in:
Kubernetes Prow Robot 2021-09-29 07:02:49 -07:00 committed by GitHub
commit bceefb7a86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 352 additions and 341 deletions

View File

@ -21,6 +21,7 @@ limitations under the License.
package app
import (
"context"
"fmt"
"time"
@ -32,53 +33,53 @@ import (
"k8s.io/kubernetes/pkg/controller/statefulset"
)
func startDaemonSetController(ctx ControllerContext) (controller.Interface, bool, error) {
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done())
return nil, true, nil
}
func startStatefulSetController(ctx ControllerContext) (controller.Interface, bool, error) {
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go statefulset.NewStatefulSetController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(ctx.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Stop)
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Apps().V1().StatefulSets(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
return nil, true, nil
}
func startReplicaSetController(ctx ControllerContext) (controller.Interface, bool, error) {
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done())
return nil, true, nil
}
func startDeploymentController(ctx ControllerContext) (controller.Interface, bool, error) {
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
controllerContext.InformerFactory.Apps().V1().Deployments(),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done())
return nil, true, nil
}

View File

@ -21,6 +21,8 @@ limitations under the License.
package app
import (
"context"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
@ -33,42 +35,42 @@ import (
"k8s.io/metrics/pkg/client/external_metrics"
)
func startHPAController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return nil, false, nil
}
return startHPAControllerWithRESTClient(ctx)
return startHPAControllerWithRESTClient(ctx, controllerContext)
}
func startHPAControllerWithRESTClient(ctx ControllerContext) (controller.Interface, bool, error) {
clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// invalidate the discovery information roughly once per resync interval our API
// information is *at most* two resync intervals old.
go custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Stop)
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Done())
metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig),
custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter),
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
external_metrics.NewForConfigOrDie(clientConfig),
)
return startHPAControllerWithMetricsClient(ctx, metricsClient)
return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient)
}
func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
// so we want to re-fetch every time when we actually ask for it
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
}
@ -77,15 +79,15 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me
hpaClient.CoreV1(),
scaleClient,
hpaClient.AutoscalingV1(),
ctx.RESTMapper,
controllerContext.RESTMapper,
metricsClient,
ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx.Stop)
controllerContext.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx.Done())
return nil, true, nil
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package app
import (
"context"
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -30,33 +31,33 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
)
func startJobController(ctx ControllerContext) (controller.Interface, bool, error) {
func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go job.NewController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done())
return nil, true, nil
}
func startCronJobController(ctx ControllerContext) (controller.Interface, bool, error) {
func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) {
cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(),
ctx.InformerFactory.Batch().V1().CronJobs(),
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.InformerFactory.Batch().V1().CronJobs(),
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err)
}
go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop)
go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Done())
return nil, true, nil
}
cjc, err := cronjob.NewController(
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
}
go cjc.Run(ctx.Stop)
go cjc.Run(ctx.Done())
return nil, true, nil
}

View File

@ -17,35 +17,36 @@ limitations under the License.
package app
import (
"context"
"fmt"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/bootstrap"
)
func startBootstrapSignerController(ctx ControllerContext) (controller.Interface, bool, error) {
func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
bsc, err := bootstrap.NewSigner(
ctx.ClientBuilder.ClientOrDie("bootstrap-signer"),
ctx.InformerFactory.Core().V1().Secrets(),
ctx.InformerFactory.Core().V1().ConfigMaps(),
controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"),
controllerContext.InformerFactory.Core().V1().Secrets(),
controllerContext.InformerFactory.Core().V1().ConfigMaps(),
bootstrap.DefaultSignerOptions(),
)
if err != nil {
return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
}
go bsc.Run(ctx.Stop)
go bsc.Run(ctx.Done())
return nil, true, nil
}
func startTokenCleanerController(ctx ControllerContext) (controller.Interface, bool, error) {
func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
tcc, err := bootstrap.NewTokenCleaner(
ctx.ClientBuilder.ClientOrDie("token-cleaner"),
ctx.InformerFactory.Core().V1().Secrets(),
controllerContext.ClientBuilder.ClientOrDie("token-cleaner"),
controllerContext.InformerFactory.Core().V1().Secrets(),
bootstrap.DefaultTokenCleanerOptions(),
)
if err != nil {
return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
}
go tcc.Run(ctx.Stop)
go tcc.Run(ctx.Done())
return nil, true, nil
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package app
import (
"context"
"fmt"
"k8s.io/controller-manager/controller"
@ -32,56 +33,56 @@ import (
csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config"
)
func startCSRSigningController(ctx ControllerContext) (controller.Interface, bool, error) {
missingSingleSigningFile := ctx.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || ctx.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
if missingSingleSigningFile && !anySpecificFilesSet(ctx.ComponentConfig.CSRSigningController) {
func startCSRSigningController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
klog.V(2).Info("skipping CSR signer controller because no csr cert/key was specified")
return nil, false, nil
}
if !missingSingleSigningFile && anySpecificFilesSet(ctx.ComponentConfig.CSRSigningController) {
if !missingSingleSigningFile && anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
return nil, false, fmt.Errorf("cannot specify default and per controller certs at the same time")
}
c := ctx.ClientBuilder.ClientOrDie("certificate-controller")
csrInformer := ctx.InformerFactory.Certificates().V1().CertificateSigningRequests()
certTTL := ctx.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration
c := controllerContext.ClientBuilder.ClientOrDie("certificate-controller")
csrInformer := controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests()
certTTL := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration
if kubeletServingSignerCertFile, kubeletServingSignerKeyFile := getKubeletServingSignerFiles(ctx.ComponentConfig.CSRSigningController); len(kubeletServingSignerCertFile) > 0 || len(kubeletServingSignerKeyFile) > 0 {
if kubeletServingSignerCertFile, kubeletServingSignerKeyFile := getKubeletServingSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletServingSignerCertFile) > 0 || len(kubeletServingSignerKeyFile) > 0 {
kubeletServingSigner, err := signer.NewKubeletServingCSRSigningController(c, csrInformer, kubeletServingSignerCertFile, kubeletServingSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err)
}
go kubeletServingSigner.Run(5, ctx.Stop)
go kubeletServingSigner.Run(5, ctx.Done())
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving")
}
if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(ctx.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 {
if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 {
kubeletClientSigner, err := signer.NewKubeletClientCSRSigningController(c, csrInformer, kubeletClientSignerCertFile, kubeletClientSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err)
}
go kubeletClientSigner.Run(5, ctx.Stop)
go kubeletClientSigner.Run(5, ctx.Done())
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet")
}
if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(ctx.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 {
if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 {
kubeAPIServerClientSigner, err := signer.NewKubeAPIServerClientCSRSigningController(c, csrInformer, kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err)
}
go kubeAPIServerClientSigner.Run(5, ctx.Stop)
go kubeAPIServerClientSigner.Run(5, ctx.Done())
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client")
}
if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(ctx.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 {
if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 {
legacyUnknownSigner, err := signer.NewLegacyUnknownCSRSigningController(c, csrInformer, legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err)
}
go legacyUnknownSigner.Run(5, ctx.Stop)
go legacyUnknownSigner.Run(5, ctx.Done())
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown")
}
@ -147,47 +148,47 @@ func getLegacyUnknownSignerFiles(config csrsigningconfig.CSRSigningControllerCon
return config.ClusterSigningCertFile, config.ClusterSigningKeyFile
}
func startCSRApprovingController(ctx ControllerContext) (controller.Interface, bool, error) {
func startCSRApprovingController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
approver := approver.NewCSRApprovingController(
ctx.ClientBuilder.ClientOrDie("certificate-controller"),
ctx.InformerFactory.Certificates().V1().CertificateSigningRequests(),
controllerContext.ClientBuilder.ClientOrDie("certificate-controller"),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go approver.Run(5, ctx.Stop)
go approver.Run(5, ctx.Done())
return nil, true, nil
}
func startCSRCleanerController(ctx ControllerContext) (controller.Interface, bool, error) {
func startCSRCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
cleaner := cleaner.NewCSRCleanerController(
ctx.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
ctx.InformerFactory.Certificates().V1().CertificateSigningRequests(),
controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go cleaner.Run(1, ctx.Stop)
go cleaner.Run(1, ctx.Done())
return nil, true, nil
}
func startRootCACertPublisher(ctx ControllerContext) (controller.Interface, bool, error) {
func startRootCACertPublisher(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
var (
rootCA []byte
err error
)
if ctx.ComponentConfig.SAController.RootCAFile != "" {
if rootCA, err = readCA(ctx.ComponentConfig.SAController.RootCAFile); err != nil {
return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err)
if controllerContext.ComponentConfig.SAController.RootCAFile != "" {
if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil {
return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err)
}
} else {
rootCA = ctx.ClientBuilder.ConfigOrDie("root-ca-cert-publisher").CAData
rootCA = controllerContext.ClientBuilder.ConfigOrDie("root-ca-cert-publisher").CAData
}
sac, err := rootcacertpublisher.NewPublisher(
ctx.InformerFactory.Core().V1().ConfigMaps(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ClientBuilder.ClientOrDie("root-ca-cert-publisher"),
controllerContext.InformerFactory.Core().V1().ConfigMaps(),
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ClientBuilder.ClientOrDie("root-ca-cert-publisher"),
rootCA,
)
if err != nil {
return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err)
}
go sac.Run(1, ctx.Stop)
go sac.Run(1, ctx.Done())
return nil, true, nil
}

View File

@ -225,12 +225,12 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
klog.Fatalf("error building controller context: %v", err)
}
controllerInitializers := initializersFunc(controllerContext.LoopMode)
if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
select {}
@ -265,9 +265,9 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// Wrap saTokenControllerInitFunc to signal readiness for migration after starting
// the controller.
startSATokenController = func(ctx ControllerContext) (controller.Interface, bool, error) {
startSATokenController = func(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
defer close(leaderMigrator.MigrationReady)
return saTokenControllerInitFunc(ctx)
return saTokenControllerInitFunc(ctx, controllerContext)
}
}
@ -352,9 +352,6 @@ type ControllerContext struct {
// ExternalLoops is for a kube-controller-manager running with a cloud-controller-manager
LoopMode ControllerLoopMode
// Stop is the stop channel
Stop <-chan struct{}
// InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe,
// for an individual controller to start the shared informers. Before it is closed, they should not.
InformersStarted chan struct{}
@ -377,7 +374,7 @@ func (c ControllerContext) IsControllerEnabled(name string) bool {
// that requests no additional features from the controller manager.
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx ControllerContext) (controller controller.Interface, enabled bool, err error)
type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)
// ControllerInitializersFunc is used to create a collection of initializers
// given the loopMode.
@ -535,7 +532,6 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
}
@ -543,34 +539,34 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
}
// StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if startSATokenController != nil {
if _, _, err := startSATokenController(ctx); err != nil {
if _, _, err := startSATokenController(ctx, controllerCtx); err != nil {
return err
}
}
// Initialize the cloud provider with a reference to the clientBuilder only after token controller
// has started in case the cloud provider uses the client builder.
if ctx.Cloud != nil {
ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop)
if controllerCtx.Cloud != nil {
controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
}
var controllerChecks []healthz.HealthChecker
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
if !controllerCtx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
ctrl, started, err := initFn(ctx)
ctrl, started, err := initFn(ctx, controllerCtx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
@ -613,25 +609,25 @@ type serviceAccountTokenControllerStarter struct {
rootClientBuilder clientbuilder.ControllerClientBuilder
}
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.IsControllerEnabled(saTokenControllerName) {
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.IsControllerEnabled(saTokenControllerName) {
klog.Warningf("%q is disabled", saTokenControllerName)
return nil, false, nil
}
if len(ctx.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
klog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
return nil, false, nil
}
privateKey, err := keyutil.PrivateKeyFromFile(ctx.ComponentConfig.SAController.ServiceAccountKeyFile)
privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
if err != nil {
return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err)
}
var rootCA []byte
if ctx.ComponentConfig.SAController.RootCAFile != "" {
if rootCA, err = readCA(ctx.ComponentConfig.SAController.RootCAFile); err != nil {
return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.ComponentConfig.SAController.RootCAFile, err)
if controllerContext.ComponentConfig.SAController.RootCAFile != "" {
if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil {
return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err)
}
} else {
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
@ -642,8 +638,8 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
return nil, false, fmt.Errorf("failed to build token generator: %v", err)
}
controller, err := serviceaccountcontroller.NewTokensController(
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Secrets(),
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
controllerContext.InformerFactory.Core().V1().Secrets(),
c.rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: tokenGenerator,
@ -653,10 +649,10 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
if err != nil {
return nil, true, fmt.Errorf("error creating Tokens controller: %v", err)
}
go controller.Run(int(ctx.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Stop)
go controller.Run(int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs), ctx.Done())
// start the first set of informers now so that other controllers can start
ctx.InformerFactory.Start(ctx.Stop)
controllerContext.InformerFactory.Start(ctx.Done())
return nil, true, nil
}

View File

@ -21,6 +21,7 @@ limitations under the License.
package app
import (
"context"
"errors"
"fmt"
"net"
@ -77,13 +78,13 @@ const (
defaultNodeMaskCIDRIPv6 = 64
)
func startServiceController(ctx ControllerContext) (controller.Interface, bool, error) {
func startServiceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
serviceController, err := servicecontroller.New(
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("service-controller"),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName,
controllerContext.Cloud,
controllerContext.ClientBuilder.ClientOrDie("service-controller"),
controllerContext.InformerFactory.Core().V1().Services(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
utilfeature.DefaultFeatureGate,
)
if err != nil {
@ -91,21 +92,21 @@ func startServiceController(ctx ControllerContext) (controller.Interface, bool,
klog.Errorf("Failed to start service controller: %v", err)
return nil, false, nil
}
go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
go serviceController.Run(ctx.Done(), int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
return nil, true, nil
}
func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool, error) {
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
var serviceCIDR *net.IPNet
var secondaryServiceCIDR *net.IPNet
// should we start nodeIPAM
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
return nil, false, nil
}
// failure: bad cidrs in config
clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
clusterCIDRs, dualStack, err := processCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
return nil, false, err
}
@ -121,17 +122,17 @@ func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool,
}
// service cidr processing
if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
_, serviceCIDR, err = netutils.ParseCIDRSloppy(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
_, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)
if err != nil {
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.ServiceCIDR, err)
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR, err)
}
}
if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
_, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
_, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
if err != nil {
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err)
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err)
}
}
@ -149,60 +150,60 @@ func startNodeIpamController(ctx ControllerContext) (controller.Interface, bool,
// only --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 supported with dual stack clusters.
// --node-cidr-mask-size flag is incompatible with dual stack clusters.
nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(ctx.ComponentConfig.NodeIPAMController, clusterCIDRs)
nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(controllerContext.ComponentConfig.NodeIPAMController, clusterCIDRs)
if err != nil {
return nil, false, err
}
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx.InformerFactory.Core().V1().Nodes(),
ctx.Cloud,
ctx.ClientBuilder.ClientOrDie("node-controller"),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.Cloud,
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
clusterCIDRs,
serviceCIDR,
secondaryServiceCIDR,
nodeCIDRMaskSizes,
ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
)
if err != nil {
return nil, true, err
}
go nodeIpamController.Run(ctx.Stop)
go nodeIpamController.Run(ctx.Done())
return nil, true, nil
}
func startNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) {
func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Coordination().V1().Leases(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Apps().V1().DaemonSets(),
controllerContext.InformerFactory.Coordination().V1().Leases(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
// node lifecycle controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
controllerContext.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
controllerContext.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
controllerContext.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
controllerContext.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
controllerContext.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
controllerContext.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
controllerContext.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
controllerContext.ComponentConfig.NodeLifecycleController.EnableTaintManager,
)
if err != nil {
return nil, true, err
}
go lifecycleController.Run(ctx.Stop)
go lifecycleController.Run(ctx.Done())
return nil, true, nil
}
func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interface, bool, error) {
func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
ctx.InformerFactory.Core().V1().Nodes(),
controllerContext.InformerFactory.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
ctx.Cloud,
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
controllerContext.ClientBuilder.ClientOrDie("node-controller"),
controllerContext.Cloud,
controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
)
if err != nil {
// the controller manager should continue to run if the "Instances" interface is not
@ -211,27 +212,27 @@ func startCloudNodeLifecycleController(ctx ControllerContext) (controller.Interf
return nil, false, nil
}
go cloudNodeLifecycleController.Run(ctx.Stop)
go cloudNodeLifecycleController.Run(ctx.Done())
return nil, true, nil
}
func startRouteController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
func startRouteController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return nil, false, nil
}
if ctx.Cloud == nil {
if controllerContext.Cloud == nil {
klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
return nil, false, nil
}
routes, ok := ctx.Cloud.Routes()
routes, ok := controllerContext.Cloud.Routes()
if !ok {
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
return nil, false, nil
}
// failure: bad cidrs in config
clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
clusterCIDRs, dualStack, err := processCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
return nil, false, err
}
@ -247,54 +248,54 @@ func startRouteController(ctx ControllerContext) (controller.Interface, bool, er
}
routeController := routecontroller.New(routes,
ctx.ClientBuilder.ClientOrDie("route-controller"),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName,
controllerContext.ClientBuilder.ClientOrDie("route-controller"),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
clusterCIDRs)
go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
go routeController.Run(ctx.Done(), controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
return nil, true, nil
}
func startPersistentVolumeBinderController(ctx ControllerContext) (controller.Interface, bool, error) {
plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
plugins, err := ProbeControllerVolumePlugins(controllerContext.Cloud, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
if err != nil {
return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
}
filteredDialOptions, err := options.ParseVolumeHostFilters(
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
if err != nil {
return nil, true, err
}
params := persistentvolumecontroller.ControllerParameters{
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
KubeClient: controllerContext.ClientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: controllerContext.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
VolumePlugins: plugins,
Cloud: ctx.Cloud,
ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName,
VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
PodInformer: ctx.InformerFactory.Core().V1().Pods(),
NodeInformer: ctx.InformerFactory.Core().V1().Nodes(),
EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
Cloud: controllerContext.Cloud,
ClusterName: controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
VolumeInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: controllerContext.InformerFactory.Storage().V1().StorageClasses(),
PodInformer: controllerContext.InformerFactory.Core().V1().Pods(),
NodeInformer: controllerContext.InformerFactory.Core().V1().Nodes(),
EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
FilteredDialOptions: filteredDialOptions,
}
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil {
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
}
go volumeController.Run(ctx.Stop)
go volumeController.Run(ctx.Done())
return nil, true, nil
}
func startAttachDetachController(ctx ControllerContext) (controller.Interface, bool, error) {
if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second {
return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period")
}
csiNodeInformer := ctx.InformerFactory.Storage().V1().CSINodes()
csiDriverInformer := ctx.InformerFactory.Storage().V1().CSIDrivers()
csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes()
csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers()
plugins, err := ProbeAttachableVolumePlugins()
if err != nil {
@ -302,55 +303,55 @@ func startAttachDetachController(ctx ControllerContext) (controller.Interface, b
}
filteredDialOptions, err := options.ParseVolumeHostFilters(
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
if err != nil {
return nil, true, err
}
attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
controllerContext.ClientBuilder.ClientOrDie("attachdetach-controller"),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
csiNodeInformer,
csiDriverInformer,
ctx.InformerFactory.Storage().V1().VolumeAttachments(),
ctx.Cloud,
controllerContext.InformerFactory.Storage().V1().VolumeAttachments(),
controllerContext.Cloud,
plugins,
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
ctx.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync,
ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration,
GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync,
controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration,
attachdetach.DefaultTimerConfig,
filteredDialOptions,
)
if attachDetachControllerErr != nil {
return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
}
go attachDetachController.Run(ctx.Stop)
go attachDetachController.Run(ctx.Done())
return nil, true, nil
}
func startVolumeExpandController(ctx ControllerContext) (controller.Interface, bool, error) {
func startVolumeExpandController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
plugins, err := ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
plugins, err := ProbeExpandableVolumePlugins(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
if err != nil {
return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err)
}
csiTranslator := csitrans.New()
filteredDialOptions, err := options.ParseVolumeHostFilters(
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist,
controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback)
if err != nil {
return nil, true, err
}
expandController, expandControllerErr := expand.NewExpandController(
ctx.ClientBuilder.ClientOrDie("expand-controller"),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.Cloud,
controllerContext.ClientBuilder.ClientOrDie("expand-controller"),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
controllerContext.Cloud,
plugins,
csiTranslator,
csimigration.NewPluginManager(csiTranslator, utilfeature.DefaultFeatureGate),
@ -360,74 +361,74 @@ func startVolumeExpandController(ctx ControllerContext) (controller.Interface, b
if expandControllerErr != nil {
return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
}
go expandController.Run(ctx.Stop)
go expandController.Run(ctx.Done())
return nil, true, nil
}
return nil, false, nil
}
func startEphemeralVolumeController(ctx ControllerContext) (controller.Interface, bool, error) {
func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) {
ephemeralController, err := ephemeral.NewController(
ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims())
controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims())
if err != nil {
return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
}
go ephemeralController.Run(int(ctx.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Stop)
go ephemeralController.Run(int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Done())
return nil, true, nil
}
return nil, false, nil
}
func startEndpointController(ctx ControllerContext) (controller.Interface, bool, error) {
func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Endpoints(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
controllerCtx.InformerFactory.Core().V1().Pods(),
controllerCtx.InformerFactory.Core().V1().Services(),
controllerCtx.InformerFactory.Core().V1().Endpoints(),
controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"),
controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Done())
return nil, true, nil
}
func startReplicationController(ctx ControllerContext) (controller.Interface, bool, error) {
func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().ReplicationControllers(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done())
return nil, true, nil
}
func startPodGCController(ctx ControllerContext) (controller.Interface, bool, error) {
func startPodGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(ctx.Stop)
controllerContext.ClientBuilder.ClientOrDie("pod-garbage-collector"),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(ctx.Done())
return nil, true, nil
}
func startResourceQuotaController(ctx ControllerContext) (controller.Interface, bool, error) {
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaControllerDiscoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource)
listerFuncForResource := generic.ListerFuncForResourceFunc(controllerContext.InformerFactory.ForResource)
quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
QuotaClient: resourceQuotaControllerClient.CoreV1(),
ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(),
ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
InformerFactory: ctx.ObjectOrMetadataInformerFactory,
ReplenishmentResyncPeriod: ctx.ResyncPeriod,
ResourceQuotaInformer: controllerContext.InformerFactory.Core().V1().ResourceQuotas(),
ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(controllerContext.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
InformerFactory: controllerContext.ObjectOrMetadataInformerFactory,
ReplenishmentResyncPeriod: controllerContext.ResyncPeriod,
DiscoveryFunc: discoveryFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
InformersStarted: ctx.InformersStarted,
InformersStarted: controllerContext.InformersStarted,
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
}
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -440,26 +441,26 @@ func startResourceQuotaController(ctx ControllerContext) (controller.Interface,
if err != nil {
return nil, false, err
}
go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop)
go resourceQuotaController.Run(int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Done())
// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop)
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
return nil, true, nil
}
func startNamespaceController(ctx ControllerContext) (controller.Interface, bool, error) {
func startNamespaceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default.
nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller")
nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller")
nsKubeconfig.QPS *= 20
nsKubeconfig.Burst *= 100
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
return startModifiedNamespaceController(ctx, namespaceKubeClient, nsKubeconfig)
return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig)
}
func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
metadataClient, err := metadata.NewForConfig(nsKubeconfig)
if err != nil {
@ -472,46 +473,46 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient
namespaceKubeClient,
metadataClient,
discoverResourcesFn,
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
v1.FinalizerKubernetes,
)
go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop)
go namespaceController.Run(int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Done())
return nil, true, nil
}
func startServiceAccountController(ctx ControllerContext) (controller.Interface, bool, error) {
func startServiceAccountController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
sac, err := serviceaccountcontroller.NewServiceAccountsController(
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ClientBuilder.ClientOrDie("service-account-controller"),
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ClientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
)
if err != nil {
return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
}
go sac.Run(1, ctx.Stop)
go sac.Run(1, ctx.Done())
return nil, true, nil
}
func startTTLController(ctx ControllerContext) (controller.Interface, bool, error) {
func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go ttlcontroller.NewTTLController(
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, ctx.Stop)
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, ctx.Done())
return nil, true, nil
}
func startGarbageCollectorController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
return nil, false, nil
}
gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
discoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
config := controllerContext.ClientBuilder.ConfigOrDie("generic-garbage-collector")
// Increase garbage collector controller's throughput: each object deletion takes two API calls,
// so to get |config.QPS| deletion rate we need to allow 2x more requests for this controller.
config.QPS *= 2
@ -521,64 +522,64 @@ func startGarbageCollectorController(ctx ControllerContext) (controller.Interfac
}
ignoredResources := make(map[schema.GroupResource]struct{})
for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
}
garbageCollector, err := garbagecollector.NewGarbageCollector(
gcClientset,
metadataClient,
ctx.RESTMapper,
controllerContext.RESTMapper,
ignoredResources,
ctx.ObjectOrMetadataInformerFactory,
ctx.InformersStarted,
controllerContext.ObjectOrMetadataInformerFactory,
controllerContext.InformersStarted,
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err)
}
// Start the garbage collector.
workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
go garbageCollector.Run(workers, ctx.Stop)
workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
go garbageCollector.Run(workers, ctx.Done())
// Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector.
go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Stop)
go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Done())
return garbageCollector, true, nil
}
func startPVCProtectionController(ctx ControllerContext) (controller.Interface, bool, error) {
func startPVCProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("pvc-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)
}
go pvcProtectionController.Run(1, ctx.Stop)
go pvcProtectionController.Run(1, ctx.Done())
return nil, true, nil
}
func startPVProtectionController(ctx ControllerContext) (controller.Interface, bool, error) {
func startPVProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go pvprotection.NewPVProtectionController(
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.ClientBuilder.ClientOrDie("pv-protection-controller"),
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"),
utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection),
).Run(1, ctx.Stop)
).Run(1, ctx.Done())
return nil, true, nil
}
func startTTLAfterFinishedController(ctx ControllerContext) (controller.Interface, bool, error) {
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) {
return nil, false, nil
}
go ttlafterfinished.New(
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
).Run(int(ctx.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Stop)
controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
).Run(int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Done())
return nil, true, nil
}
@ -674,11 +675,12 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl
}
return sortedSizes(ipv4Mask, ipv6Mask), nil
}
func startStorageVersionGCController(ctx ControllerContext) (controller.Interface, bool, error) {
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go storageversiongc.NewStorageVersionGC(
ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
ctx.InformerFactory.Coordination().V1().Leases(),
ctx.InformerFactory.Internal().V1alpha1().StorageVersions(),
).Run(ctx.Stop)
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
controllerContext.InformerFactory.Coordination().V1().Leases(),
controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
).Run(ctx.Done())
return nil, true, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package app
import (
"context"
"testing"
"time"
@ -104,7 +105,7 @@ func possibleDiscoveryResource() []*metav1.APIResourceList {
}
}
type controllerInitFunc func(ControllerContext) (controller.Interface, bool, error)
type controllerInitFunc func(context.Context, ControllerContext) (controller.Interface, bool, error)
func TestController_DiscoveryError(t *testing.T) {
controllerInitFuncMap := map[string]controllerInitFunc{
@ -143,13 +144,13 @@ func TestController_DiscoveryError(t *testing.T) {
InformersStarted: make(chan struct{}),
}
for funcName, controllerInit := range controllerInitFuncMap {
_, _, err := controllerInit(ctx)
_, _, err := controllerInit(context.TODO(), ctx)
if test.expectedErr != (err != nil) {
t.Errorf("%v test failed for use case: %v", funcName, name)
}
}
_, _, err := startModifiedNamespaceController(
ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller"))
context.TODO(), ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller"))
if test.expectedErr != (err != nil) {
t.Errorf("Namespace Controller test failed for use case: %v", name)
}

View File

@ -21,32 +21,34 @@ limitations under the License.
package app
import (
"context"
"k8s.io/controller-manager/controller"
endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice"
endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
)
func startEndpointSliceController(ctx ControllerContext) (controller.Interface, bool, error) {
func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go endpointslicecontroller.NewController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Discovery().V1().EndpointSlices(),
ctx.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
ctx.ClientBuilder.ClientOrDie("endpointslice-controller"),
ctx.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(ctx.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Stop)
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Services(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"),
controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Done())
return nil, true, nil
}
func startEndpointSliceMirroringController(ctx ControllerContext) (controller.Interface, bool, error) {
func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go endpointslicemirroringcontroller.NewController(
ctx.InformerFactory.Core().V1().Endpoints(),
ctx.InformerFactory.Discovery().V1().EndpointSlices(),
ctx.InformerFactory.Core().V1().Services(),
ctx.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset,
ctx.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"),
ctx.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration,
).Run(int(ctx.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), ctx.Stop)
controllerContext.InformerFactory.Core().V1().Endpoints(),
controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
controllerContext.InformerFactory.Core().V1().Services(),
controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset,
controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"),
controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs), ctx.Done())
return nil, true, nil
}

View File

@ -21,6 +21,8 @@ limitations under the License.
package app
import (
"context"
"k8s.io/klog/v2"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -31,31 +33,31 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
)
func startDisruptionController(ctx ControllerContext) (controller.Interface, bool, error) {
func startDisruptionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
klog.InfoS("Refusing to start disruption because the PodDisruptionBudget feature is disabled")
return nil, false, nil
}
client := ctx.ClientBuilder.ClientOrDie("disruption-controller")
config := ctx.ClientBuilder.ConfigOrDie("disruption-controller")
client := controllerContext.ClientBuilder.ClientOrDie("disruption-controller")
config := controllerContext.ClientBuilder.ConfigOrDie("disruption-controller")
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
scaleClient, err := scale.NewForConfig(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
scaleClient, err := scale.NewForConfig(config, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
}
go disruption.NewDisruptionController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Policy().V1().PodDisruptionBudgets(),
ctx.InformerFactory.Core().V1().ReplicationControllers(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(),
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Apps().V1().Deployments(),
controllerContext.InformerFactory.Apps().V1().StatefulSets(),
client,
ctx.RESTMapper,
controllerContext.RESTMapper,
scaleClient,
client.Discovery(),
).Run(ctx.Stop)
).Run(ctx.Done())
return nil, true, nil
}

View File

@ -17,18 +17,20 @@ limitations under the License.
package app
import (
"context"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
)
func startClusterRoleAggregrationController(ctx ControllerContext) (controller.Interface, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] {
func startClusterRoleAggregrationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] {
return nil, false, nil
}
go clusterroleaggregation.NewClusterRoleAggregation(
ctx.InformerFactory.Rbac().V1().ClusterRoles(),
ctx.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(5, ctx.Stop)
controllerContext.InformerFactory.Rbac().V1().ClusterRoles(),
controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(5, ctx.Done())
return nil, true, nil
}