Merge pull request #132703 from tchap/kcm-sync-api

kcm/app: Add proper goroutine management
This commit is contained in:
Kubernetes Prow Robot
2025-09-15 01:58:11 -07:00
committed by GitHub
23 changed files with 1691 additions and 937 deletions

View File

@@ -25,7 +25,6 @@ import (
"time"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
@@ -35,85 +34,115 @@ import (
func newDaemonSetControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.DaemonSetController,
aliases: []string{"daemonset"},
initFunc: startDaemonSetController,
name: names.DaemonSetController,
aliases: []string{"daemonset"},
constructor: newDaemonSetController,
}
}
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newDaemonSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("daemon-set-controller")
if err != nil {
return nil, err
}
dsc, err := daemon.NewDaemonSetsController(
ctx,
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
client,
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
return nil, fmt.Errorf("error creating DaemonSets controller: %w", err)
}
go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
}, controllerName), nil
}
func newStatefulSetControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.StatefulSetController,
aliases: []string{"statefulset"},
initFunc: startStatefulSetController,
name: names.StatefulSetController,
aliases: []string{"statefulset"},
constructor: newStatefulSetController,
}
}
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go statefulset.NewStatefulSetController(
func newStatefulSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("statefulset-controller")
if err != nil {
return nil, err
}
ssc := statefulset.NewStatefulSetController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Apps().V1().StatefulSets(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs))
return nil, true, nil
client,
)
return newControllerLoop(func(ctx context.Context) {
ssc.Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs))
}, controllerName), nil
}
func newReplicaSetControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ReplicaSetController,
aliases: []string{"replicaset"},
initFunc: startReplicaSetController,
name: names.ReplicaSetController,
aliases: []string{"replicaset"},
constructor: newReplicaSetController,
}
}
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go replicaset.NewReplicaSetController(
func newReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("replicaset-controller")
if err != nil {
return nil, err
}
rsc := replicaset.NewReplicaSetController(
ctx,
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
client,
replicaset.BurstReplicas,
).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
return nil, true, nil
)
return newControllerLoop(func(ctx context.Context) {
rsc.Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
}, controllerName), nil
}
func newDeploymentControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.DeploymentController,
aliases: []string{"deployment"},
initFunc: startDeploymentController,
name: names.DeploymentController,
aliases: []string{"deployment"},
constructor: newDeploymentController,
}
}
func startDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("deployment-controller")
if err != nil {
return nil, err
}
dc, err := deployment.NewDeploymentController(
ctx,
controllerContext.InformerFactory.Apps().V1().Deployments(),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("deployment-controller"),
client,
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
return nil, fmt.Errorf("error creating Deployment controller: %w", err)
}
go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
}, controllerName), nil
}

View File

@@ -21,14 +21,12 @@ package app
import (
"context"
"fmt"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
"k8s.io/metrics/pkg/client/custom_metrics"
"k8s.io/metrics/pkg/client/external_metrics"
@@ -36,47 +34,50 @@ import (
func newHorizontalPodAutoscalerControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.HorizontalPodAutoscalerController,
aliases: []string{"horizontalpodautoscaling"},
initFunc: startHorizontalPodAutoscalerControllerWithRESTClient,
name: names.HorizontalPodAutoscalerController,
aliases: []string{"horizontalpodautoscaling"},
constructor: newHorizontalPodAutoscalerController,
}
}
func startHorizontalPodAutoscalerControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newHorizontalPodAutoscalerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
clientConfig, err := controllerContext.NewClientConfig("horizontal-pod-autoscaler")
if err != nil {
return nil, err
}
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// invalidate the discovery information roughly once per resync interval our API
// information is *at most* two resync intervals old.
go custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Done())
metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig),
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
external_metrics.NewForConfigOrDie(clientConfig),
)
return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient)
}
func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient, err := controllerContext.NewClient("horizontal-pod-autoscaler")
if err != nil {
return nil, err
}
// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
// so we want to re-fetch every time when we actually ask for it
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
scaleClient, err := scale.NewForConfig(clientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
return nil, fmt.Errorf("failed to init HPA scale client: %w", err)
}
go podautoscaler.NewHorizontalController(
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
resourceClient, err := resourceclient.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to init the resource client for %s: %w", controllerName, err)
}
externalMetricsClient, err := external_metrics.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to init the external metrics client for %s: %w", controllerName, err)
}
metricsClient := metrics.NewRESTMetricsClient(
resourceClient,
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
externalMetricsClient,
)
pas := podautoscaler.NewHorizontalController(
ctx,
hpaClient.CoreV1(),
scaleClient,
@@ -90,6 +91,16 @@ func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
).Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs))
return nil, true, nil
)
return newControllerLoop(concurrentRun(
func(ctx context.Context) {
custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Done())
},
func(ctx context.Context) {
pas.Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs))
},
), controllerName), nil
}

View File

@@ -23,7 +23,6 @@ import (
"context"
"fmt"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job"
@@ -31,43 +30,58 @@ import (
func newJobControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.JobController,
aliases: []string{"job"},
initFunc: startJobController,
name: names.JobController,
aliases: []string{"job"},
constructor: newJobController,
}
}
func startJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
jobController, err := job.NewController(
func newJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("job-controller")
if err != nil {
return nil, err
}
jc, err := job.NewController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("job-controller"),
client,
)
if err != nil {
return nil, true, fmt.Errorf("creating Job controller: %v", err)
return nil, fmt.Errorf("creating Job controller: %w", err)
}
go jobController.Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs))
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
jc.Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs))
}, controllerName), nil
}
func newCronJobControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.CronJobController,
aliases: []string{"cronjob"},
initFunc: startCronJobController,
name: names.CronJobController,
aliases: []string{"cronjob"},
constructor: newCronJobController,
}
}
func startCronJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
func newCronJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("cronjob-controller")
if err != nil {
return nil, err
}
cj2c, err := cronjob.NewControllerV2(
ctx,
controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.InformerFactory.Batch().V1().CronJobs(),
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
client,
)
if err != nil {
return nil, true, fmt.Errorf("creating CronJob controller V2: %v", err)
return nil, fmt.Errorf("creating CronJob controller V2: %w", err)
}
go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs))
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs))
}, controllerName), nil
}

View File

@@ -20,7 +20,6 @@ import (
"context"
"fmt"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/bootstrap"
)
@@ -29,41 +28,53 @@ func newBootstrapSignerControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.BootstrapSignerController,
aliases: []string{"bootstrapsigner"},
initFunc: startBootstrapSignerController,
constructor: newBootstrapSignerController,
isDisabledByDefault: true,
}
}
func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newBootstrapSignerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("bootstrap-signer")
if err != nil {
return nil, err
}
bsc, err := bootstrap.NewSigner(
controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"),
client,
controllerContext.InformerFactory.Core().V1().Secrets(),
controllerContext.InformerFactory.Core().V1().ConfigMaps(),
bootstrap.DefaultSignerOptions(),
)
if err != nil {
return nil, true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
return nil, fmt.Errorf("error creating BootstrapSigner controller: %w", err)
}
go bsc.Run(ctx)
return nil, true, nil
return newControllerLoop(bsc.Run, controllerName), nil
}
func newTokenCleanerControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.TokenCleanerController,
aliases: []string{"tokencleaner"},
initFunc: startTokenCleanerController,
constructor: newTokenCleanerController,
isDisabledByDefault: true,
}
}
func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("token-cleaner")
if err != nil {
return nil, err
}
tcc, err := bootstrap.NewTokenCleaner(
controllerContext.ClientBuilder.ClientOrDie("token-cleaner"),
client,
controllerContext.InformerFactory.Core().V1().Secrets(),
bootstrap.DefaultTokenCleanerOptions(),
)
if err != nil {
return nil, true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
return nil, fmt.Errorf("error creating TokenCleaner controller: %w", err)
}
go tcc.Run(ctx)
return nil, true, nil
return newControllerLoop(tcc.Run, controllerName), nil
}

View File

@@ -29,10 +29,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/certificates/approver"
@@ -47,33 +45,41 @@ import (
func newCertificateSigningRequestSigningControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.CertificateSigningRequestSigningController,
aliases: []string{"csrsigning"},
initFunc: startCertificateSigningRequestSigningController,
name: names.CertificateSigningRequestSigningController,
aliases: []string{"csrsigning"},
constructor: newCertificateSigningRequestSigningController,
}
}
func startCertificateSigningRequestSigningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newCertificateSigningRequestSigningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
logger := klog.FromContext(ctx)
missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
logger.Info("Skipping CSR signer controller because no csr cert/key was specified")
return nil, false, nil
return nil, nil
}
if !missingSingleSigningFile && anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
return nil, false, fmt.Errorf("cannot specify default and per controller certs at the same time")
return nil, fmt.Errorf("cannot specify default and per controller certs at the same time")
}
c, err := controllerContext.NewClient("certificate-controller")
if err != nil {
return nil, err
}
c := controllerContext.ClientBuilder.ClientOrDie("certificate-controller")
csrInformer := controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests()
certTTL := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningDuration.Duration
var rx []runFunc
if kubeletServingSignerCertFile, kubeletServingSignerKeyFile := getKubeletServingSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletServingSignerCertFile) > 0 || len(kubeletServingSignerKeyFile) > 0 {
kubeletServingSigner, err := signer.NewKubeletServingCSRSigningController(ctx, c, csrInformer, kubeletServingSignerCertFile, kubeletServingSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err)
return nil, fmt.Errorf("failed to init kubernetes.io/kubelet-serving certificate controller: %w", err)
}
go kubeletServingSigner.Run(ctx, 5)
rx = append(rx, func(ctx context.Context) {
kubeletServingSigner.Run(ctx, 5)
})
} else {
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kubelet-serving")
}
@@ -81,9 +87,12 @@ func startCertificateSigningRequestSigningController(ctx context.Context, contro
if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 {
kubeletClientSigner, err := signer.NewKubeletClientCSRSigningController(ctx, c, csrInformer, kubeletClientSignerCertFile, kubeletClientSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err)
return nil, fmt.Errorf("failed to init kubernetes.io/kube-apiserver-client-kubelet certificate controller: %w", err)
}
go kubeletClientSigner.Run(ctx, 5)
rx = append(rx, func(ctx context.Context) {
kubeletClientSigner.Run(ctx, 5)
})
} else {
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kube-apiserver-client-kubelet")
}
@@ -91,9 +100,12 @@ func startCertificateSigningRequestSigningController(ctx context.Context, contro
if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 {
kubeAPIServerClientSigner, err := signer.NewKubeAPIServerClientCSRSigningController(ctx, c, csrInformer, kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err)
return nil, fmt.Errorf("failed to init kubernetes.io/kube-apiserver-client certificate controller: %w", err)
}
go kubeAPIServerClientSigner.Run(ctx, 5)
rx = append(rx, func(ctx context.Context) {
kubeAPIServerClientSigner.Run(ctx, 5)
})
} else {
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kube-apiserver-client")
}
@@ -101,14 +113,17 @@ func startCertificateSigningRequestSigningController(ctx context.Context, contro
if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 {
legacyUnknownSigner, err := signer.NewLegacyUnknownCSRSigningController(ctx, c, csrInformer, legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile, certTTL)
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err)
return nil, fmt.Errorf("failed to init kubernetes.io/legacy-unknown certificate controller: %w", err)
}
go legacyUnknownSigner.Run(ctx, 5)
rx = append(rx, func(ctx context.Context) {
legacyUnknownSigner.Run(ctx, 5)
})
} else {
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/legacy-unknown")
}
return nil, true, nil
return newControllerLoop(concurrentRun(rx...), controllerName), nil
}
func areKubeletServingSignerFilesSpecified(config csrsigningconfig.CSRSigningControllerConfiguration) bool {
@@ -171,49 +186,60 @@ func getLegacyUnknownSignerFiles(config csrsigningconfig.CSRSigningControllerCon
func newCertificateSigningRequestApprovingControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.CertificateSigningRequestApprovingController,
aliases: []string{"csrapproving"},
initFunc: startCertificateSigningRequestApprovingController,
name: names.CertificateSigningRequestApprovingController,
aliases: []string{"csrapproving"},
constructor: newCertificateSigningRequestApprovingController,
}
}
func startCertificateSigningRequestApprovingController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
approver := approver.NewCSRApprovingController(
func newCertificateSigningRequestApprovingController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("certificate-controller")
if err != nil {
return nil, err
}
ac := approver.NewCSRApprovingController(
ctx,
controllerContext.ClientBuilder.ClientOrDie("certificate-controller"),
client,
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go approver.Run(ctx, 5)
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
ac.Run(ctx, 5)
}, controllerName), nil
}
func newCertificateSigningRequestCleanerControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.CertificateSigningRequestCleanerController,
aliases: []string{"csrcleaner"},
initFunc: startCertificateSigningRequestCleanerController,
name: names.CertificateSigningRequestCleanerController,
aliases: []string{"csrcleaner"},
constructor: newCertificateSigningRequestCleanerController,
}
}
func startCertificateSigningRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
cleaner := cleaner.NewCSRCleanerController(
controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
func newCertificateSigningRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("certificate-controller")
if err != nil {
return nil, err
}
cc := cleaner.NewCSRCleanerController(
client.CertificatesV1().CertificateSigningRequests(),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go cleaner.Run(ctx, 1)
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
cc.Run(ctx, 1)
}, controllerName), nil
}
func newPodCertificateRequestCleanerControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.PodCertificateRequestCleanerController,
initFunc: startPodCertificateRequestCleanerController,
name: names.PodCertificateRequestCleanerController,
constructor: newPodCertificateRequestCleanerController,
requiredFeatureGates: []featuregate.Feature{
features.PodCertificateRequest,
},
}
}
func startPodCertificateRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newPodCertificateRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
cleaner := cleaner.NewPCRCleanerController(
controllerContext.ClientBuilder.ClientOrDie("podcertificaterequestcleaner"),
controllerContext.InformerFactory.Certificates().V1alpha1().PodCertificateRequests(),
@@ -221,60 +247,69 @@ func startPodCertificateRequestCleanerController(ctx context.Context, controller
15*time.Minute, // We expect all PodCertificateRequest flows to complete faster than this.
5*time.Minute,
)
go cleaner.Run(ctx, 1)
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
cleaner.Run(ctx, 1)
}, controllerName), nil
}
func newRootCACertificatePublisherControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.RootCACertificatePublisherController,
aliases: []string{"root-ca-cert-publisher"},
initFunc: startRootCACertificatePublisherController,
name: names.RootCACertificatePublisherController,
aliases: []string{"root-ca-cert-publisher"},
constructor: newRootCACertificatePublisherController,
}
}
func startRootCACertificatePublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newRootCACertificatePublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
rootCA, err := getKubeAPIServerCAFileContents(controllerContext)
if err != nil {
return nil, true, err
return nil, err
}
client, err := controllerContext.NewClient("root-ca-cert-publisher")
if err != nil {
return nil, err
}
sac, err := rootcacertpublisher.NewPublisher(
controllerContext.InformerFactory.Core().V1().ConfigMaps(),
controllerContext.InformerFactory.Core().V1().Namespaces(),
controllerContext.ClientBuilder.ClientOrDie("root-ca-cert-publisher"),
client,
rootCA,
)
if err != nil {
return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err)
return nil, fmt.Errorf("error creating root CA certificate publisher: %w", err)
}
go sac.Run(ctx, 1)
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {
sac.Run(ctx, 1)
}, controllerName), nil
}
func newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.KubeAPIServerClusterTrustBundlePublisherController,
initFunc: newKubeAPIServerSignerClusterTrustBundledPublisherController,
constructor: newKubeAPIServerSignerClusterTrustBundledPublisherController,
requiredFeatureGates: []featuregate.Feature{features.ClusterTrustBundle},
}
}
type controllerConstructor func(string, dynamiccertificates.CAContentProvider, kubernetes.Interface) (ctbpublisher.PublisherRunner, error)
func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
func newKubeAPIServerSignerClusterTrustBundledPublisherController(
ctx context.Context, controllerContext ControllerContext, controllerName string,
) (Controller, error) {
rootCA, err := getKubeAPIServerCAFileContents(controllerContext)
if err != nil {
return nil, false, err
return nil, err
}
if len(rootCA) == 0 || !utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundle) {
return nil, false, nil
if len(rootCA) == 0 {
return nil, nil
}
servingSigners, err := dynamiccertificates.NewStaticCAContent("kube-apiserver-serving", rootCA)
if err != nil {
return nil, false, fmt.Errorf("failed to create a static CA content provider for the kube-apiserver-serving signer: %w", err)
return nil, fmt.Errorf("failed to create a static CA content provider for the kube-apiserver-serving signer: %w", err)
}
schemaControllerMapping := map[schema.GroupVersion]controllerConstructor{
@@ -282,12 +317,16 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Co
certificatesv1beta1.SchemeGroupVersion: ctbpublisher.NewBetaClusterTrustBundlePublisher,
}
apiserverSignerClient := controllerContext.ClientBuilder.ClientOrDie("kube-apiserver-serving-clustertrustbundle-publisher")
apiserverSignerClient, err := controllerContext.NewClient("kube-apiserver-serving-clustertrustbundle-publisher")
if err != nil {
return nil, err
}
var runner ctbpublisher.PublisherRunner
for _, gv := range []schema.GroupVersion{certificatesv1beta1.SchemeGroupVersion, certificatesv1alpha1.SchemeGroupVersion} {
ctbAvailable, err := clusterTrustBundlesAvailable(apiserverSignerClient, gv)
if err != nil {
return nil, false, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err)
return nil, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err)
}
if !ctbAvailable {
@@ -300,18 +339,17 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Co
apiserverSignerClient,
)
if err != nil {
return nil, false, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err)
return nil, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err)
}
break
}
if runner == nil {
klog.Info("no known scheme version was found for clustertrustbundles, cannot start kube-apiserver-serving-clustertrustbundle-publisher-controller")
return nil, false, nil
return nil, nil
}
go runner.Run(ctx)
return nil, true, nil
return newControllerLoop(runner.Run, controllerName), nil
}
func clusterTrustBundlesAvailable(client kubernetes.Interface, schemaVersion schema.GroupVersion) (bool, error) {
@@ -334,7 +372,11 @@ func clusterTrustBundlesAvailable(client kubernetes.Interface, schemaVersion sch
func getKubeAPIServerCAFileContents(controllerContext ControllerContext) ([]byte, error) {
if controllerContext.ComponentConfig.SAController.RootCAFile == "" {
return controllerContext.ClientBuilder.ConfigOrDie("root-ca-cert-publisher").CAData, nil
config, err := controllerContext.NewClientConfig("root-ca-cert-publisher")
if err != nil {
return nil, err
}
return config.CAData, nil
}
rootCA, err := readCA(controllerContext.ComponentConfig.SAController.RootCAFile)

View File

@@ -24,6 +24,7 @@ import (
basecompatibility "k8s.io/component-base/compatibility"
"k8s.io/component-base/zpages/flagz"
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
"time"
)
// Config is the main context object for the controller manager.
@@ -49,6 +50,8 @@ type Config struct {
EventBroadcaster record.EventBroadcaster
EventRecorder record.EventRecorder
ControllerShutdownTimeout time.Duration
// ComponentGlobalsRegistry is the registry where the effective versions and feature gates for all components are stored.
ComponentGlobalsRegistry basecompatibility.ComponentGlobalsRegistry
}

View File

@@ -0,0 +1,247 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"fmt"
"sort"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
"k8s.io/klog/v2"
)
// This file contains types and functions for wrapping controller implementations from downstream packages.
// Every controller wrapper implements the Controller interface,
// which is then associated with a ControllerDescriptor, which holds additional static metadata
// needed so that the manager can manage Controllers properly.
// Controller defines the base interface that all controller wrappers must implement.
type Controller interface {
// Name returns the controller's canonical name.
Name() string
// Run runs the controller loop.
// When there is anything to be done, it blocks until the context is cancelled.
// Run must ensure all goroutines are terminated before returning.
Run(context.Context)
}
// ControllerConstructor is a constructor for a controller.
// A nil Controller returned means that the associated controller is disabled.
type ControllerConstructor func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error)
type ControllerDescriptor struct {
name string
constructor ControllerConstructor
requiredFeatureGates []featuregate.Feature
aliases []string
isDisabledByDefault bool
isCloudProviderController bool
requiresSpecialHandling bool
}
func (r *ControllerDescriptor) Name() string {
return r.name
}
func (r *ControllerDescriptor) GetControllerConstructor() ControllerConstructor {
return r.constructor
}
func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature {
return append([]featuregate.Feature(nil), r.requiredFeatureGates...)
}
// GetAliases returns aliases to ensure backwards compatibility and should never be removed!
// Only addition of new aliases is allowed, and only when a canonical name is changed (please see CHANGE POLICY of controller names)
func (r *ControllerDescriptor) GetAliases() []string {
return append([]string(nil), r.aliases...)
}
func (r *ControllerDescriptor) IsDisabledByDefault() bool {
return r.isDisabledByDefault
}
func (r *ControllerDescriptor) IsCloudProviderController() bool {
return r.isCloudProviderController
}
// RequiresSpecialHandling should return true only in a special non-generic controllers like ServiceAccountTokenController
func (r *ControllerDescriptor) RequiresSpecialHandling() bool {
return r.requiresSpecialHandling
}
// BuildController creates a controller based on the given descriptor.
// The associated controller's constructor is called at the end, so the same contract applies for the return values here.
func (r *ControllerDescriptor) BuildController(ctx context.Context, controllerCtx ControllerContext) (Controller, error) {
logger := klog.FromContext(ctx)
controllerName := r.Name()
for _, featureGate := range r.GetRequiredFeatureGates() {
if !utilfeature.DefaultFeatureGate.Enabled(featureGate) {
logger.Info("Controller is disabled by a feature gate",
"controller", controllerName,
"requiredFeatureGates", r.GetRequiredFeatureGates())
return nil, nil
}
}
if r.IsCloudProviderController() {
logger.Info("Skipping a cloud provider controller", "controller", controllerName)
return nil, nil
}
ctx = klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName))
return r.GetControllerConstructor()(ctx, controllerCtx, controllerName)
}
// KnownControllers returns all known controllers' name
func KnownControllers() []string {
return sets.StringKeySet(NewControllerDescriptors()).List()
}
// ControllerAliases returns a mapping of aliases to canonical controller names
func ControllerAliases() map[string]string {
aliases := map[string]string{}
for name, c := range NewControllerDescriptors() {
for _, alias := range c.GetAliases() {
aliases[alias] = name
}
}
return aliases
}
func ControllersDisabledByDefault() []string {
var controllersDisabledByDefault []string
for name, c := range NewControllerDescriptors() {
if c.IsDisabledByDefault() {
controllersDisabledByDefault = append(controllersDisabledByDefault, name)
}
}
sort.Strings(controllersDisabledByDefault)
return controllersDisabledByDefault
}
// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func)
// paired to their ControllerDescriptor wrapper object that includes the associated controller constructor.
// This allows for structured downstream composition and subdivision.
func NewControllerDescriptors() map[string]*ControllerDescriptor {
controllers := map[string]*ControllerDescriptor{}
aliases := sets.NewString()
// All the controllers must fulfil common constraints, or else we will explode.
register := func(controllerDesc *ControllerDescriptor) {
if controllerDesc == nil {
panic("received nil controller for a registration")
}
name := controllerDesc.Name()
if len(name) == 0 {
panic("received controller without a name for a registration")
}
if _, found := controllers[name]; found {
panic(fmt.Sprintf("controller name %q was registered twice", name))
}
if controllerDesc.GetControllerConstructor() == nil {
panic(fmt.Sprintf("controller %q does not have a constructor specified", name))
}
for _, alias := range controllerDesc.GetAliases() {
if aliases.Has(alias) {
panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
}
aliases.Insert(alias)
}
controllers[name] = controllerDesc
}
// First add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// in the main controller loop initialization, so we add them here only for the metadata and duplication detection.
// app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers
// The only known special case is the ServiceAccountTokenController which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new
// special controllers.
register(newServiceAccountTokenControllerDescriptor(nil))
register(newEndpointsControllerDescriptor())
register(newEndpointSliceControllerDescriptor())
register(newEndpointSliceMirroringControllerDescriptor())
register(newReplicationControllerDescriptor())
register(newPodGarbageCollectorControllerDescriptor())
register(newResourceQuotaControllerDescriptor())
register(newNamespaceControllerDescriptor())
register(newServiceAccountControllerDescriptor())
register(newGarbageCollectorControllerDescriptor())
register(newDaemonSetControllerDescriptor())
register(newJobControllerDescriptor())
register(newDeploymentControllerDescriptor())
register(newReplicaSetControllerDescriptor())
register(newHorizontalPodAutoscalerControllerDescriptor())
register(newDisruptionControllerDescriptor())
register(newStatefulSetControllerDescriptor())
register(newCronJobControllerDescriptor())
register(newCertificateSigningRequestSigningControllerDescriptor())
register(newCertificateSigningRequestApprovingControllerDescriptor())
register(newCertificateSigningRequestCleanerControllerDescriptor())
register(newPodCertificateRequestCleanerControllerDescriptor())
register(newTTLControllerDescriptor())
register(newBootstrapSignerControllerDescriptor())
register(newTokenCleanerControllerDescriptor())
register(newNodeIpamControllerDescriptor())
register(newNodeLifecycleControllerDescriptor())
register(newServiceLBControllerDescriptor()) // cloud provider controller
register(newNodeRouteControllerDescriptor()) // cloud provider controller
register(newCloudNodeLifecycleControllerDescriptor()) // cloud provider controller
register(newPersistentVolumeBinderControllerDescriptor())
register(newPersistentVolumeAttachDetachControllerDescriptor())
register(newPersistentVolumeExpanderControllerDescriptor())
register(newClusterRoleAggregrationControllerDescriptor())
register(newPersistentVolumeClaimProtectionControllerDescriptor())
register(newPersistentVolumeProtectionControllerDescriptor())
register(newVolumeAttributesClassProtectionControllerDescriptor())
register(newTTLAfterFinishedControllerDescriptor())
register(newRootCACertificatePublisherControllerDescriptor())
register(newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor())
register(newEphemeralVolumeControllerDescriptor())
// feature gated
register(newStorageVersionGarbageCollectorControllerDescriptor())
register(newResourceClaimControllerDescriptor())
register(newDeviceTaintEvictionControllerDescriptor())
register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
register(newValidatingAdmissionPolicyStatusControllerDescriptor())
register(newTaintEvictionControllerDescriptor())
register(newServiceCIDRsControllerDescriptor())
register(newStorageVersionMigratorControllerDescriptor())
register(newSELinuxWarningControllerDescriptor())
for _, alias := range aliases.UnsortedList() {
if _, ok := controllers[alias]; ok {
panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
}
}
return controllers
}

View File

@@ -0,0 +1,67 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"sync"
)
// This file contains utility functions for implementing controller wrappers,
// i.e. turning whatever logic into a Controller.
type runFunc func(ctx context.Context)
type runFuncSlice []runFunc
func (rx runFuncSlice) Run(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(rx))
for _, fnc := range rx {
go func() {
defer wg.Done()
fnc(ctx)
}()
}
wg.Wait()
}
// concurrentRun returns a runFunc that wraps the given functions to run concurrently.
func concurrentRun(rx ...runFunc) runFunc {
return runFuncSlice(rx).Run
}
// controllerLoop implements the Controller interface. It makes it easy to turn a function into a Controller.
type controllerLoop struct {
name string
run runFunc
}
func newControllerLoop(run runFunc, controllerName string) *controllerLoop {
return &controllerLoop{
name: controllerName,
run: run,
}
}
func (loop *controllerLoop) Name() string {
return loop.name
}
func (loop *controllerLoop) Run(ctx context.Context) {
loop.run(ctx)
}

View File

@@ -25,7 +25,7 @@ import (
"math/rand"
"net/http"
"os"
"sort"
"sync"
"time"
"github.com/blang/semver/v4"
@@ -43,6 +43,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
@@ -51,7 +52,6 @@ import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
basecompatibility "k8s.io/component-base/compatibility"
@@ -80,9 +80,7 @@ import (
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/serviceaccount"
)
func init() {
@@ -253,16 +251,27 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if err := StartControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler); err != nil {
logger.Error(err, "Error starting controllers")
// Prepare all controllers in advance.
controllers, err := BuildControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler)
if err != nil {
logger.Error(err, "Error building controllers")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
// Start the informers.
stopCh := ctx.Done()
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
<-ctx.Done()
// Actually start the controllers.
if len(controllers) > 0 {
if !RunControllers(ctx, controllerContext, controllers, ControllerStartJitter, c.ControllerShutdownTimeout) {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
} else {
<-ctx.Done()
}
}
// No leader election, run directly
@@ -291,14 +300,23 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
kubeControllerManager)
// startSATokenControllerInit is the original InitFunc.
startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc()
// startSATokenControllerInit is the original constructor.
saTokenControllerInit := saTokenControllerDescriptor.GetControllerConstructor()
// Wrap saTokenControllerDescriptor to signal readiness for migration after starting
// the controller.
saTokenControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
defer close(leaderMigrator.MigrationReady)
return startSATokenControllerInit(ctx, controllerContext, controllerName)
// Wrap saTokenControllerDescriptor to signal readiness for migration after starting the controller.
saTokenControllerDescriptor.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
ctrl, err := saTokenControllerInit(ctx, controllerContext, controllerName)
if err != nil {
return nil, err
}
// This wrapping is not exactly flawless as RunControllers uses type casting,
// which is now not possible for the wrapped controller.
// This fortunately doesn't matter for this particular controller.
return newControllerLoop(func(ctx context.Context) {
close(leaderMigrator.MigrationReady)
ctrl.Run(ctx)
}, controllerName), nil
}
}
@@ -431,188 +449,22 @@ func (c ControllerContext) IsControllerEnabled(controllerDescriptor *ControllerD
return genericcontrollermanager.IsControllerEnabled(controllerDescriptor.Name(), controllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
}
// InitFunc is used to launch a particular controller. It returns a controller
// that can optionally implement other interfaces so that the controller manager
// can support the requested features.
// The returned controller may be nil, which will be considered an anonymous controller
// that requests no additional features from the controller manager.
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type InitFunc func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controller.Interface, enabled bool, err error)
type ControllerDescriptor struct {
name string
initFunc InitFunc
requiredFeatureGates []featuregate.Feature
aliases []string
isDisabledByDefault bool
isCloudProviderController bool
requiresSpecialHandling bool
}
func (r *ControllerDescriptor) Name() string {
return r.name
}
func (r *ControllerDescriptor) GetInitFunc() InitFunc {
return r.initFunc
}
func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature {
return append([]featuregate.Feature(nil), r.requiredFeatureGates...)
}
// GetAliases returns aliases to ensure backwards compatibility and should never be removed!
// Only addition of new aliases is allowed, and only when a canonical name is changed (please see CHANGE POLICY of controller names)
func (r *ControllerDescriptor) GetAliases() []string {
return append([]string(nil), r.aliases...)
}
func (r *ControllerDescriptor) IsDisabledByDefault() bool {
return r.isDisabledByDefault
}
func (r *ControllerDescriptor) IsCloudProviderController() bool {
return r.isCloudProviderController
}
// RequiresSpecialHandling should return true only in a special non-generic controllers like ServiceAccountTokenController
func (r *ControllerDescriptor) RequiresSpecialHandling() bool {
return r.requiresSpecialHandling
}
// KnownControllers returns all known controllers's name
func KnownControllers() []string {
return sets.StringKeySet(NewControllerDescriptors()).List()
}
// ControllerAliases returns a mapping of aliases to canonical controller names
func ControllerAliases() map[string]string {
aliases := map[string]string{}
for name, c := range NewControllerDescriptors() {
for _, alias := range c.GetAliases() {
aliases[alias] = name
}
// NewClientConfig is a shortcut for ClientBuilder.Config. It wraps the error with an additional message.
func (c ControllerContext) NewClientConfig(name string) (*restclient.Config, error) {
config, err := c.ClientBuilder.Config(name)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client config for %q: %w", name, err)
}
return aliases
return config, nil
}
func ControllersDisabledByDefault() []string {
var controllersDisabledByDefault []string
for name, c := range NewControllerDescriptors() {
if c.IsDisabledByDefault() {
controllersDisabledByDefault = append(controllersDisabledByDefault, name)
}
// NewClient is a shortcut for ClientBuilder.Client. It wraps the error with an additional message.
func (c ControllerContext) NewClient(name string) (kubernetes.Interface, error) {
client, err := c.ClientBuilder.Client(name)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client for %q: %w", name, err)
}
sort.Strings(controllersDisabledByDefault)
return controllersDisabledByDefault
}
// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func)
// paired to their ControllerDescriptor wrapper object that includes InitFunc.
// This allows for structured downstream composition and subdivision.
func NewControllerDescriptors() map[string]*ControllerDescriptor {
controllers := map[string]*ControllerDescriptor{}
aliases := sets.NewString()
// All the controllers must fulfil common constraints, or else we will explode.
register := func(controllerDesc *ControllerDescriptor) {
if controllerDesc == nil {
panic("received nil controller for a registration")
}
name := controllerDesc.Name()
if len(name) == 0 {
panic("received controller without a name for a registration")
}
if _, found := controllers[name]; found {
panic(fmt.Sprintf("controller name %q was registered twice", name))
}
if controllerDesc.GetInitFunc() == nil {
panic(fmt.Sprintf("controller %q does not have an init function", name))
}
for _, alias := range controllerDesc.GetAliases() {
if aliases.Has(alias) {
panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
}
aliases.Insert(alias)
}
controllers[name] = controllerDesc
}
// First add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// in the main controller loop initialization, so we add them here only for the metadata and duplication detection.
// app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers
// The only known special case is the ServiceAccountTokenController which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new
// special controllers.
register(newServiceAccountTokenControllerDescriptor(nil))
register(newEndpointsControllerDescriptor())
register(newEndpointSliceControllerDescriptor())
register(newEndpointSliceMirroringControllerDescriptor())
register(newReplicationControllerDescriptor())
register(newPodGarbageCollectorControllerDescriptor())
register(newResourceQuotaControllerDescriptor())
register(newNamespaceControllerDescriptor())
register(newServiceAccountControllerDescriptor())
register(newGarbageCollectorControllerDescriptor())
register(newDaemonSetControllerDescriptor())
register(newJobControllerDescriptor())
register(newDeploymentControllerDescriptor())
register(newReplicaSetControllerDescriptor())
register(newHorizontalPodAutoscalerControllerDescriptor())
register(newDisruptionControllerDescriptor())
register(newStatefulSetControllerDescriptor())
register(newCronJobControllerDescriptor())
register(newCertificateSigningRequestSigningControllerDescriptor())
register(newCertificateSigningRequestApprovingControllerDescriptor())
register(newCertificateSigningRequestCleanerControllerDescriptor())
register(newPodCertificateRequestCleanerControllerDescriptor())
register(newTTLControllerDescriptor())
register(newBootstrapSignerControllerDescriptor())
register(newTokenCleanerControllerDescriptor())
register(newNodeIpamControllerDescriptor())
register(newNodeLifecycleControllerDescriptor())
register(newServiceLBControllerDescriptor()) // cloud provider controller
register(newNodeRouteControllerDescriptor()) // cloud provider controller
register(newCloudNodeLifecycleControllerDescriptor()) // cloud provider controller
register(newPersistentVolumeBinderControllerDescriptor())
register(newPersistentVolumeAttachDetachControllerDescriptor())
register(newPersistentVolumeExpanderControllerDescriptor())
register(newClusterRoleAggregrationControllerDescriptor())
register(newPersistentVolumeClaimProtectionControllerDescriptor())
register(newPersistentVolumeProtectionControllerDescriptor())
register(newVolumeAttributesClassProtectionControllerDescriptor())
register(newTTLAfterFinishedControllerDescriptor())
register(newRootCACertificatePublisherControllerDescriptor())
register(newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor())
register(newEphemeralVolumeControllerDescriptor())
// feature gated
register(newStorageVersionGarbageCollectorControllerDescriptor())
register(newResourceClaimControllerDescriptor())
register(newDeviceTaintEvictionControllerDescriptor())
register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
register(newValidatingAdmissionPolicyStatusControllerDescriptor())
register(newTaintEvictionControllerDescriptor())
register(newServiceCIDRsControllerDescriptor())
register(newStorageVersionMigratorControllerDescriptor())
register(newSELinuxWarningControllerDescriptor())
for _, alias := range aliases.UnsortedList() {
if _, ok := controllers[alias]; ok {
panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
}
}
return controllers
return client, nil
}
// CreateControllerContext creates a context struct containing references to resources needed by the
@@ -629,20 +481,37 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo
return obj, nil
}
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
versionedClient, err := rootClientBuilder.Client("shared-informers")
if err != nil {
return ControllerContext{}, fmt.Errorf("failed to create Kubernetes client for %q: %w", "shared-informers", err)
}
sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim))
metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
metadataConfig, err := rootClientBuilder.Config("metadata-informers")
if err != nil {
return ControllerContext{}, fmt.Errorf("failed to create metadata client config: %w", err)
}
metadataClient, err := metadata.NewForConfig(metadataConfig)
if err != nil {
return ControllerContext{}, fmt.Errorf("failed to create metadata client: %w", err)
}
metadataInformers := metadatainformer.NewSharedInformerFactoryWithOptions(metadataClient, ResyncPeriod(s)(), metadatainformer.WithTransform(trim))
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %w", err)
}
// Use a discovery client capable of being refreshed.
discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
discoveryClient, err := rootClientBuilder.DiscoveryClient("controller-discovery")
if err != nil {
return ControllerContext{}, fmt.Errorf("failed to create discovery client: %w", err)
}
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
@@ -681,95 +550,38 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo
return controllerContext, nil
}
// StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
var controllerChecks []healthz.HealthChecker
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok {
check, err := StartController(ctx, controllerCtx, serviceAccountTokenControllerDescriptor, unsecuredMux)
if err != nil {
return err
}
if check != nil {
// HealthChecker should be present when controller has started
controllerChecks = append(controllerChecks, check)
}
}
// Each controller is passed a context where the logger has the name of
// the controller set through WithName. That name then becomes the prefix of
// of all log messages emitted by that controller.
//
// In StartController, an explicit "controller" key is used instead, for two reasons:
// - while contextual logging is alpha, klog.LoggerWithName is still a no-op,
// so we cannot rely on it yet to add the name
// - it allows distinguishing between log entries emitted by the controller
// and those emitted for it - this is a bit debatable and could be revised.
for _, controllerDesc := range controllerDescriptors {
if controllerDesc.RequiresSpecialHandling() {
continue
}
check, err := StartController(ctx, controllerCtx, controllerDesc, unsecuredMux)
if err != nil {
return err
}
if check != nil {
// HealthChecker should be present when controller has started
controllerChecks = append(controllerChecks, check)
}
}
healthzHandler.AddHealthChecker(controllerChecks...)
return nil
// HealthCheckAdder is an interface to represent a healthz handler.
// The extra level of indirection is useful for testing.
type HealthCheckAdder interface {
AddHealthChecker(checks ...healthz.HealthChecker)
}
// StartController starts a controller with a specified ControllerContext
// and performs required pre- and post- checks/actions
func StartController(ctx context.Context, controllerCtx ControllerContext, controllerDescriptor *ControllerDescriptor,
unsecuredMux *mux.PathRecorderMux) (healthz.HealthChecker, error) {
// BuildControllers builds all controllers in the given descriptor map. Disabled controllers are obviously skipped.
//
// A health check is registered for each controller using the controller name. The default check always passes.
// If the controller implements controller.HealthCheckable, though, the given check is used.
// The controller can also implement controller.Debuggable, in which case the debug handler is registered with the given mux.
func BuildControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor,
unsecuredMux *mux.PathRecorderMux, healthzHandler HealthCheckAdder) ([]Controller, error) {
logger := klog.FromContext(ctx)
controllerName := controllerDescriptor.Name()
for _, featureGate := range controllerDescriptor.GetRequiredFeatureGates() {
if !utilfeature.DefaultFeatureGate.Enabled(featureGate) {
logger.Info("Controller is disabled by a feature gate", "controller", controllerName, "requiredFeatureGates", controllerDescriptor.GetRequiredFeatureGates())
return nil, nil
var (
controllers []Controller
checks []healthz.HealthChecker
)
buildController := func(controllerDesc *ControllerDescriptor) error {
controllerName := controllerDesc.Name()
ctrl, err := controllerDesc.BuildController(ctx, controllerCtx)
if err != nil {
logger.Error(err, "Error initializing a controller", "controller", controllerName)
return err
}
if ctrl == nil {
logger.Info("Warning: skipping controller", "controller", controllerName)
return nil
}
}
if controllerDescriptor.IsCloudProviderController() {
logger.Info("Skipping a cloud provider controller", "controller", controllerName)
return nil, nil
}
if !controllerCtx.IsControllerEnabled(controllerDescriptor) {
logger.Info("Warning: controller is disabled", "controller", controllerName)
return nil, nil
}
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
logger.V(1).Info("Starting controller", "controller", controllerName)
initFunc := controllerDescriptor.GetInitFunc()
ctrl, started, err := initFunc(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx, controllerName)
if err != nil {
logger.Error(err, "Error starting controller", "controller", controllerName)
return nil, err
}
if !started {
logger.Info("Warning: skipping controller", "controller", controllerName)
return nil, nil
}
check := controllerhealthz.NamedPingChecker(controllerName)
if ctrl != nil {
// check if the controller supports and requests a debugHandler
check := controllerhealthz.NamedPingChecker(controllerName)
// check if the controller supports and requests a debugHandler,
// and it needs the unsecuredMux to mount the handler onto.
if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
@@ -783,69 +595,152 @@ func StartController(ctx context.Context, controllerCtx ControllerContext, contr
check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
}
}
controllers = append(controllers, ctrl)
checks = append(checks, check)
return nil
}
logger.Info("Started controller", "controller", controllerName)
return check, nil
}
// serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers.
// It cannot use the "normal" client builder, so it tracks its own.
func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ServiceAccountTokenController,
aliases: []string{"serviceaccount-token"},
initFunc: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
return startServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder)
},
// will make sure it runs first before other controllers
requiresSpecialHandling: true,
}
}
func startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext, controllerName string, rootClientBuilder clientbuilder.ControllerClientBuilder) (controller.Interface, bool, error) {
logger := klog.FromContext(ctx)
if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
logger.Info("Controller is disabled because there is no private key", "controller", controllerName)
return nil, false, nil
}
privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
if err != nil {
return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err)
}
var rootCA []byte
if controllerContext.ComponentConfig.SAController.RootCAFile != "" {
if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil {
return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err)
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok {
if err := buildController(serviceAccountTokenControllerDescriptor); err != nil {
return nil, err
}
} else {
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}
tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey)
if err != nil {
return nil, false, fmt.Errorf("failed to build token generator: %v", err)
}
tokenController, err := serviceaccountcontroller.NewTokensController(
logger,
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
controllerContext.InformerFactory.Core().V1().Secrets(),
rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: tokenGenerator,
RootCA: rootCA,
},
)
if err != nil {
return nil, true, fmt.Errorf("error creating Tokens controller: %v", err)
}
go tokenController.Run(ctx, int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs))
// Each controller is passed a context where the logger has the name of
// the controller set through WithName. That name then becomes the prefix of
// all log messages emitted by that controller.
//
// In StartController, an explicit "controller" key is used instead, for two reasons:
// - while contextual logging is alpha, klog.LoggerWithName is still a no-op,
// so we cannot rely on it yet to add the name
// - it allows distinguishing between log entries emitted by the controller
// and those emitted for it - this is a bit debatable and could be revised.
for _, controllerDesc := range controllerDescriptors {
if controllerDesc.RequiresSpecialHandling() {
continue
}
// start the first set of informers now so that other controllers can start
controllerContext.InformerFactory.Start(ctx.Done())
if !controllerCtx.IsControllerEnabled(controllerDesc) {
logger.Info("Warning: controller is disabled", "controller", controllerDesc.Name())
continue
}
return nil, true, nil
if err := buildController(controllerDesc); err != nil {
return nil, err
}
}
// Register the checks.
if len(checks) > 0 {
healthzHandler.AddHealthChecker(checks...)
}
return controllers, nil
}
// RunControllers runs all controllers concurrently and blocks until the context is cancelled and all controllers are terminated.
//
// Once the context is cancelled, RunControllers waits for shutdownTimeout for all controllers to terminate.
// When the timeout is reached, the function unblocks and returns false.
// Zero shutdown timeout means that there is no timeout.
func RunControllers(ctx context.Context, controllerCtx ControllerContext, controllers []Controller,
controllerStartJitterMaxFactor float64, shutdownTimeout time.Duration) bool {
logger := klog.FromContext(ctx)
// We gather running controllers names for logging purposes.
// When the context is cancelled, the controllers still running are logged periodically.
runningControllers := sets.New[string]()
var runningControllersLock sync.Mutex
loggingCtx, cancelLoggingCtx := context.WithCancel(context.Background())
defer cancelLoggingCtx()
go func() {
// Only start logging when terminating.
select {
case <-ctx.Done():
case <-loggingCtx.Done():
return
}
// Regularly print the controllers that still haven't returned.
logPeriod := shutdownTimeout / 3
if logPeriod == 0 {
logPeriod = 5 * time.Second
}
ticker := time.NewTicker(logPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
runningControllersLock.Lock()
running := sets.List(runningControllers)
runningControllersLock.Unlock()
logger.Info("Still waiting for some controllers to terminate...", "runningControllers", running)
case <-loggingCtx.Done():
return
}
}
}()
terminatedCh := make(chan struct{})
go func() {
defer close(terminatedCh)
var wg sync.WaitGroup
wg.Add(len(controllers))
for _, controller := range controllers {
go func() {
defer wg.Done()
// It would be better to unblock and return on context cancelled here,
// but that makes tests more flaky regarding timing.
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, controllerStartJitterMaxFactor))
logger.V(1).Info("Controller starting...", "controller", controller.Name())
runningControllersLock.Lock()
runningControllers.Insert(controller.Name())
runningControllersLock.Unlock()
defer func() {
logger.V(1).Info("Controller terminated", "controller", controller.Name())
runningControllersLock.Lock()
runningControllers.Delete(controller.Name())
runningControllersLock.Unlock()
}()
controller.Run(ctx)
}()
}
wg.Wait()
logger.Info("All controllers terminated")
}()
// Wait for a signal to terminate.
select {
case <-ctx.Done():
case <-terminatedCh:
return true
}
// Wait for the shutdown timeout.
var shutdownCh <-chan time.Time
if shutdownTimeout > 0 {
shutdownCh = time.After(shutdownTimeout)
}
select {
case <-terminatedCh:
return true
case <-shutdownCh:
runningControllersLock.Lock()
running := sets.List(runningControllers)
runningControllersLock.Unlock()
logger.Info("Controller shutdown timeout reached", "timeout", shutdownTimeout, "runningControllers", running)
return false
}
}
func readCA(file string) ([]byte, error) {

View File

@@ -21,16 +21,17 @@ import (
"regexp"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cpnames "k8s.io/cloud-provider/names"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
controllermanagercontroller "k8s.io/controller-manager/controller"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/features"
@@ -206,19 +207,21 @@ func TestTaintEvictionControllerGating(t *testing.T) {
initFuncCalled := false
taintEvictionControllerDescriptor := NewControllerDescriptors()[names.TaintEvictionController]
taintEvictionControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controllermanagercontroller.Interface, enabled bool, err error) {
taintEvictionControllerDescriptor.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
initFuncCalled = true
return nil, true, nil
return newControllerLoop(func(ctx context.Context) {}, controllerName), nil
}
healthCheck, err := StartController(ctx, controllerCtx, taintEvictionControllerDescriptor, nil)
if err != nil {
var healthChecks mockHealthCheckAdder
if err := runControllers(ctx, controllerCtx, map[string]*ControllerDescriptor{
names.TaintEvictionController: taintEvictionControllerDescriptor,
}, &healthChecks); err != nil {
t.Errorf("starting a TaintEvictionController controller should not return an error")
}
if test.expectInitFuncCall != initFuncCalled {
t.Errorf("TaintEvictionController init call check failed: expected=%v, got=%v", test.expectInitFuncCall, initFuncCalled)
}
hasHealthCheck := healthCheck != nil
hasHealthCheck := len(healthChecks.Checks) > 0
expectHealthCheck := test.expectInitFuncCall
if expectHealthCheck != hasHealthCheck {
t.Errorf("TaintEvictionController healthCheck check failed: expected=%v, got=%v", expectHealthCheck, hasHealthCheck)
@@ -229,23 +232,96 @@ func TestTaintEvictionControllerGating(t *testing.T) {
func TestNoCloudProviderControllerStarted(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controllerCtx := ControllerContext{}
controllerCtx.ComponentConfig.Generic.Controllers = []string{"*"}
for _, controller := range NewControllerDescriptors() {
cpControllerDescriptors := make(map[string]*ControllerDescriptor)
for controllerName, controller := range NewControllerDescriptors() {
if !controller.IsCloudProviderController() {
continue
}
controllerName := controller.Name()
checker, err := StartController(ctx, controllerCtx, controller, nil)
if err != nil {
t.Errorf("Error starting controller %q: %v", controllerName, err)
}
if checker != nil {
t.Errorf("Controller %q should not be started", controllerName)
controller.constructor = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
return newControllerLoop(func(ctx context.Context) {
t.Error("Controller should not be started:", controllerName)
}, controllerName), nil
}
cpControllerDescriptors[controllerName] = controller
}
var healthChecks mockHealthCheckAdder
if err := runControllers(ctx, controllerCtx, cpControllerDescriptors, &healthChecks); err != nil {
t.Error("Failed to start controllers:", err)
}
}
func TestRunControllers(t *testing.T) {
testCases := []struct {
name string
newController func(ctx context.Context) Controller
shutdownTimeout time.Duration
expectedCleanTermination bool
}{
{
name: "clean shutdown",
newController: func(testCtx context.Context) Controller {
return newControllerLoop(func(ctx context.Context) {
<-ctx.Done()
}, "controller-A")
},
shutdownTimeout: 10 * time.Second,
expectedCleanTermination: true,
},
{
name: "shutdown timeout",
newController: func(testCtx context.Context) Controller {
return newControllerLoop(func(ctx context.Context) {
<-testCtx.Done()
}, "controller-A")
},
shutdownTimeout: 50 * time.Millisecond,
expectedCleanTermination: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
controllerCtx := ControllerContext{}
// testCtx is used to make sure the controller failing to shut down can exit after the test is finished.
testCtx, cancelTest := context.WithCancel(ctx)
defer cancelTest()
// ctx is used to wait in the controller for shutdown and also to start the shutdown timeout in RunControllers.
// To start the shutdown timeout immediately, we start with a cancelled context already.
ctx, cancelController := context.WithCancel(ctx)
cancelController()
cleanShutdown := RunControllers(ctx, controllerCtx, []Controller{tc.newController(testCtx)}, 0, tc.shutdownTimeout)
if cleanShutdown != tc.expectedCleanTermination {
t.Errorf("expected clean shutdown %v, got %v", tc.expectedCleanTermination, cleanShutdown)
}
})
}
}
type mockHealthCheckAdder struct {
Checks []healthz.HealthChecker
}
func (m *mockHealthCheckAdder) AddHealthChecker(checks ...healthz.HealthChecker) {
m.Checks = append(m.Checks, checks...)
}
func runControllers(
ctx context.Context, controllerCtx ControllerContext,
controllerDescriptors map[string]*ControllerDescriptor, healthzChecks HealthCheckAdder,
) error {
controllers, err := BuildControllers(ctx, controllerCtx, controllerDescriptors, nil, healthzChecks)
if err != nil {
return err
}
RunControllers(ctx, controllerCtx, controllers, 0, 0)
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -18,6 +18,7 @@ package app
import (
"context"
"sync"
"testing"
"time"
@@ -28,6 +29,8 @@ import (
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
)
// TestClientBuilder inherits ClientBuilder and can accept a given fake clientset.
@@ -35,12 +38,14 @@ type TestClientBuilder struct {
clientset clientset.Interface
}
func (TestClientBuilder) Config(name string) (*restclient.Config, error) { return nil, nil }
func (TestClientBuilder) Config(name string) (*restclient.Config, error) {
return &restclient.Config{}, nil
}
func (TestClientBuilder) ConfigOrDie(name string) *restclient.Config {
return &restclient.Config{}
}
func (TestClientBuilder) Client(name string) (clientset.Interface, error) { return nil, nil }
func (m TestClientBuilder) Client(name string) (clientset.Interface, error) { return m.clientset, nil }
func (m TestClientBuilder) ClientOrDie(name string) clientset.Interface {
return m.clientset
}
@@ -130,26 +135,44 @@ func TestController_DiscoveryError(t *testing.T) {
},
}
for name, test := range tcs {
testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources}
testClientset := NewFakeClientset(testDiscovery)
testClientBuilder := TestClientBuilder{clientset: testClientset}
testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1))
ctx := ControllerContext{
ClientBuilder: testClientBuilder,
InformerFactory: testInformerFactory,
ObjectOrMetadataInformerFactory: testInformerFactory,
InformersStarted: make(chan struct{}),
}
for controllerName, controllerDesc := range controllerDescriptorMap {
_, _, err := controllerDesc.GetInitFunc()(context.TODO(), ctx, controllerName)
if test.expectedErr != (err != nil) {
t.Errorf("%v test failed for use case: %v", controllerName, name)
t.Run(name, func(t *testing.T) {
ctx := context.Background()
testDiscovery := FakeDiscoveryWithError{Err: test.discoveryError, PossibleResources: test.possibleResources}
testClientset := NewFakeClientset(testDiscovery)
testClientBuilder := TestClientBuilder{clientset: testClientset}
testInformerFactory := informers.NewSharedInformerFactoryWithOptions(testClientset, time.Duration(1))
controllerContext := ControllerContext{
ClientBuilder: testClientBuilder,
InformerFactory: testInformerFactory,
ObjectOrMetadataInformerFactory: testInformerFactory,
InformersStarted: make(chan struct{}),
}
}
_, _, err := startModifiedNamespaceController(
context.TODO(), ctx, testClientset, testClientBuilder.ConfigOrDie("namespace-controller"))
if test.expectedErr != (err != nil) {
t.Errorf("Namespace Controller test failed for use case: %v", name)
}
for controllerName, controllerDesc := range controllerDescriptorMap {
_, err := controllerDesc.GetControllerConstructor()(ctx, controllerContext, controllerName)
if test.expectedErr != (err != nil) {
t.Errorf("%v test failed for use case: %v", controllerName, name)
}
}
namespaceController, err := newModifiedNamespaceController(
ctx, controllerContext, names.NamespaceController,
testClientset, testClientBuilder.ConfigOrDie("namespace-controller"))
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
namespaceController.Run(ctx)
}()
cancel()
wg.Wait()
if test.expectedErr != (err != nil) {
t.Errorf("Namespace Controller test failed for use case: %v", name)
}
})
}
}

View File

@@ -22,7 +22,6 @@ package app
import (
"context"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice"
endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
@@ -30,43 +29,57 @@ import (
func newEndpointSliceControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.EndpointSliceController,
aliases: []string{"endpointslice"},
initFunc: startEndpointSliceController,
name: names.EndpointSliceController,
aliases: []string{"endpointslice"},
constructor: newEndpointSliceController,
}
}
func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go endpointslicecontroller.NewController(
func newEndpointSliceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("endpointslice-controller")
if err != nil {
return nil, err
}
esc := endpointslicecontroller.NewController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Services(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"),
client,
controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
).Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs))
return nil, true, nil
)
return newControllerLoop(func(ctx context.Context) {
esc.Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs))
}, controllerName), nil
}
func newEndpointSliceMirroringControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.EndpointSliceMirroringController,
aliases: []string{"endpointslicemirroring"},
initFunc: startEndpointSliceMirroringController,
name: names.EndpointSliceMirroringController,
aliases: []string{"endpointslicemirroring"},
constructor: newEndpointSliceMirroringController,
}
}
func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go endpointslicemirroringcontroller.NewController(
func newEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("endpointslicemirroring-controller")
if err != nil {
return nil, err
}
esmc := endpointslicemirroringcontroller.NewController(
ctx,
controllerContext.InformerFactory.Core().V1().Endpoints(),
controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
controllerContext.InformerFactory.Core().V1().Services(),
controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset,
controllerContext.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"),
client,
controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringEndpointUpdatesBatchPeriod.Duration,
).Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs))
return nil, true, nil
)
return newControllerLoop(func(ctx context.Context) {
esmc.Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceMirroringController.MirroringConcurrentServiceEndpointSyncs))
}, controllerName), nil
}

View File

@@ -23,7 +23,6 @@ import (
"context"
"k8s.io/component-base/featuregate"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/servicecidrs"
"k8s.io/kubernetes/pkg/features"
@@ -31,20 +30,28 @@ import (
func newServiceCIDRsControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ServiceCIDRController,
initFunc: startServiceCIDRsController,
name: names.ServiceCIDRController,
constructor: newServiceCIDRsController,
requiredFeatureGates: []featuregate.Feature{
features.MultiCIDRServiceAllocator,
}}
},
}
}
func startServiceCIDRsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go servicecidrs.NewController(
func newServiceCIDRsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("service-cidrs-controller")
if err != nil {
return nil, err
}
// TODO use component config
scc := servicecidrs.NewController(
ctx,
controllerContext.InformerFactory.Networking().V1().ServiceCIDRs(),
controllerContext.InformerFactory.Networking().V1().IPAddresses(),
controllerContext.ClientBuilder.ClientOrDie("service-cidrs-controller"),
).Run(ctx, 5)
// TODO use component config
return nil, true, nil
client,
)
return newControllerLoop(func(ctx context.Context) {
scc.Run(ctx, 5)
}, controllerName), nil
}

View File

@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"net"
"time"
v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@@ -108,6 +109,8 @@ type KubeControllerManagerOptions struct {
Master string
ShowHiddenMetricsForVersion string
ControllerShutdownTimeout time.Duration
// ComponentGlobalsRegistry is the registry where the effective versions and feature gates for all components are stored.
ComponentGlobalsRegistry basecompatibility.ComponentGlobalsRegistry
@@ -238,6 +241,8 @@ func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources
s.Generic.LeaderElection.ResourceName = "kube-controller-manager"
s.Generic.LeaderElection.ResourceNamespace = "kube-system"
s.ControllerShutdownTimeout = 10 * time.Second
return &s, nil
}
@@ -298,6 +303,9 @@ func (s *KubeControllerManagerOptions) Flags(allControllers []string, disabledBy
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
fs.StringVar(&s.Generic.ClientConnection.Kubeconfig, "kubeconfig", s.Generic.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization and master location information (the master location can be overridden by the master flag).")
fss.FlagSet("generic").DurationVar(&s.ControllerShutdownTimeout, "controller-shutdown-timeout",
s.ControllerShutdownTimeout, "Time to wait for the controllers to shut down before terminating the executable")
if !utilfeature.DefaultFeatureGate.Enabled(featuregate.Feature(clientgofeaturegate.WatchListClient)) {
ver := version.MustParse("1.34")
if err := utilfeature.DefaultMutableFeatureGate.OverrideDefaultAtVersion(featuregate.Feature(clientgofeaturegate.WatchListClient), true, ver); err != nil {
@@ -413,6 +421,7 @@ func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config, a
return err
}
}
c.ControllerShutdownTimeout = s.ControllerShutdownTimeout
return nil
}
@@ -502,11 +511,12 @@ func (s KubeControllerManagerOptions) Config(ctx context.Context, allControllers
eventRecorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: KubeControllerManagerUserAgent})
c := &kubecontrollerconfig.Config{
Client: client,
Kubeconfig: kubeconfig,
EventBroadcaster: eventBroadcaster,
EventRecorder: eventRecorder,
ComponentGlobalsRegistry: s.ComponentGlobalsRegistry,
Client: client,
Kubeconfig: kubeconfig,
EventBroadcaster: eventBroadcaster,
EventRecorder: eventRecorder,
ControllerShutdownTimeout: s.ControllerShutdownTimeout,
ComponentGlobalsRegistry: s.ComponentGlobalsRegistry,
}
if err := s.ApplyTo(c, allControllers, disabledByDefaultControllers, controllerAliases); err != nil {
return nil, err

View File

@@ -447,9 +447,10 @@ func TestAddFlags(t *testing.T) {
AlwaysAllowPaths: []string{"/healthz", "/readyz", "/livez"}, // note: this does not match /healthz/ or /healthz/*
AlwaysAllowGroups: []string{"system:masters"},
},
Master: "192.168.4.20",
Metrics: &metrics.Options{},
Logs: logs.NewOptions(),
Master: "192.168.4.20",
ControllerShutdownTimeout: 10 * time.Second,
Metrics: &metrics.Options{},
Logs: logs.NewOptions(),
// ignores comparing ComponentGlobalsRegistry in this test.
ComponentGlobalsRegistry: s.ComponentGlobalsRegistry,
}
@@ -722,6 +723,7 @@ func TestApplyTo(t *testing.T) {
ConcurrentPolicySyncs: 9,
},
},
ControllerShutdownTimeout: 10 * time.Second,
}
// Sort GCIgnoredResources because it's built from a map, which means the

View File

@@ -106,7 +106,7 @@ func probeControllerVolumePlugins(logger klog.Logger, config persistentvolumecon
}
if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, &hostPathConfig); err != nil {
logger.Error(err, "Could not create hostpath recycler pod from file", "path", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
return nil, err
}
allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(hostPathConfig)...)
@@ -117,7 +117,7 @@ func probeControllerVolumePlugins(logger klog.Logger, config persistentvolumecon
}
if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, &nfsConfig); err != nil {
logger.Error(err, "Could not create NFS recycler pod from file", "path", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
return nil, err
}
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)

View File

@@ -29,12 +29,10 @@ import (
type probeFn func() []volume.VolumePlugin
func appendPluginBasedOnFeatureFlags(logger klog.Logger, plugins []volume.VolumePlugin, inTreePluginName string, featureGate featuregate.FeatureGate, pluginInfo pluginInfo) ([]volume.VolumePlugin, error) {
_, err := csimigration.CheckMigrationFeatureFlags(featureGate, pluginInfo.pluginMigrationFeature, pluginInfo.pluginUnregisterFeature)
if err != nil {
logger.Error(err, "Unexpected CSI Migration Feature Flags combination detected. CSI Migration may not take effect")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
// TODO: fail and return here once alpha only tests can set the feature flags for a plugin correctly
return nil, err
}
// Skip appending the in-tree plugin to the list of plugins to be probed/initialized

View File

@@ -21,32 +21,38 @@ package app
import (
"context"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/disruption"
)
func newDisruptionControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.DisruptionController,
aliases: []string{"disruption"},
initFunc: startDisruptionController,
name: names.DisruptionController,
aliases: []string{"disruption"},
constructor: newDisruptionController,
}
}
func startDisruptionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
client := controllerContext.ClientBuilder.ClientOrDie("disruption-controller")
config := controllerContext.ClientBuilder.ConfigOrDie("disruption-controller")
func newDisruptionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("disruption-controller")
if err != nil {
return nil, err
}
config, err := controllerContext.NewClientConfig("disruption-controller")
if err != nil {
return nil, err
}
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
scaleClient, err := scale.NewForConfig(config, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
return nil, err
}
go disruption.NewDisruptionController(
dc := disruption.NewDisruptionController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(),
@@ -58,6 +64,6 @@ func startDisruptionController(ctx context.Context, controllerContext Controller
controllerContext.RESTMapper,
scaleClient,
client.Discovery(),
).Run(ctx)
return nil, true, nil
)
return newControllerLoop(dc.Run, controllerName), nil
}

View File

@@ -19,23 +19,29 @@ package app
import (
"context"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
)
func newClusterRoleAggregrationControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ClusterRoleAggregationController,
aliases: []string{"clusterrole-aggregation"},
initFunc: startClusterRoleAggregationController,
name: names.ClusterRoleAggregationController,
aliases: []string{"clusterrole-aggregation"},
constructor: newClusterRoleAggregationController,
}
}
func startClusterRoleAggregationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go clusterroleaggregation.NewClusterRoleAggregation(
func newClusterRoleAggregationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
client, err := controllerContext.NewClient("clusterrole-aggregation-controller")
if err != nil {
return nil, err
}
crac := clusterroleaggregation.NewClusterRoleAggregation(
controllerContext.InformerFactory.Rbac().V1().ClusterRoles(),
controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(ctx, 5)
return nil, true, nil
client.RbacV1(),
)
return newControllerLoop(func(ctx context.Context) {
crac.Run(ctx, 5)
}, controllerName), nil
}

View File

@@ -0,0 +1,98 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"fmt"
"k8s.io/client-go/util/keyutil"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/serviceaccount"
)
// serviceAccountTokenController is special because it must run first to set up permissions for other controllers.
// It cannot use the "normal" client builder, so it tracks its own.
func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ServiceAccountTokenController,
aliases: []string{"serviceaccount-token"},
constructor: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
return newServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder)
},
// This controller is started manually before any other controller.
requiresSpecialHandling: true,
}
}
func newServiceAccountTokenController(
ctx context.Context, controllerContext ControllerContext, controllerName string,
rootClientBuilder clientbuilder.ControllerClientBuilder,
) (Controller, error) {
if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
klog.FromContext(ctx).Info("Controller is disabled because there is no private key", "controller", controllerName)
return nil, nil
}
privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
if err != nil {
return nil, fmt.Errorf("error reading key for service account token controller: %w", err)
}
var rootCA []byte
if controllerContext.ComponentConfig.SAController.RootCAFile != "" {
if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil {
return nil, fmt.Errorf("error parsing root-ca-file at %s: %w", controllerContext.ComponentConfig.SAController.RootCAFile, err)
}
} else {
config, err := rootClientBuilder.Config("tokens-controller")
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client config for %q: %w", "tokens-controller", err)
}
rootCA = config.CAData
}
client, err := rootClientBuilder.Client("tokens-controller")
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client for %q: %w", "tokens-controller", err)
}
tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey)
if err != nil {
return nil, fmt.Errorf("failed to build token generator: %w", err)
}
tokenController, err := serviceaccountcontroller.NewTokensController(
klog.FromContext(ctx),
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
controllerContext.InformerFactory.Core().V1().Secrets(),
client,
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: tokenGenerator,
RootCA: rootCA,
},
)
if err != nil {
return nil, fmt.Errorf("error creating Tokens controller: %w", err)
}
return newControllerLoop(func(ctx context.Context) {
tokenController.Run(ctx, int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs))
}, controllerName), nil
}

View File

@@ -20,82 +20,87 @@ import (
"context"
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/metadata"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientgofeaturegate "k8s.io/client-go/features"
"k8s.io/client-go/metadata"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
svm "k8s.io/kubernetes/pkg/controller/storageversionmigrator"
"k8s.io/kubernetes/pkg/features"
)
func newStorageVersionMigratorControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.StorageVersionMigratorController,
aliases: []string{"svm"},
initFunc: startSVMController,
name: names.StorageVersionMigratorController,
aliases: []string{"svm"},
constructor: newSVMController,
}
}
func startSVMController(
ctx context.Context,
controllerContext ControllerContext,
controllerName string,
) (controller.Interface, bool, error) {
func newSVMController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionMigrator) ||
!clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) {
return nil, false, nil
return nil, nil
}
if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
return nil, true, fmt.Errorf("storage version migrator requires garbage collector")
return nil, fmt.Errorf("storage version migrator requires garbage collector")
}
if !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
err := fmt.Errorf("storage version migrator requires the InOrderInformers feature gate to be enabled")
return nil, true, err
return nil, err
}
// svm controller can make a lot of requests during migration, keep it fast
config := controllerContext.ClientBuilder.ConfigOrDie(controllerName)
config, err := controllerContext.NewClientConfig(controllerName)
if err != nil {
return nil, err
}
config.QPS *= 20
config.Burst *= 100
client := controllerContext.ClientBuilder.ClientOrDie(controllerName)
client, err := controllerContext.NewClient(controllerName)
if err != nil {
return nil, err
}
informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations()
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, false, err
return nil, err
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, false, err
return nil, err
}
go svm.NewResourceVersionController(
ctx,
client,
discoveryClient,
metadata.NewForConfigOrDie(config),
informer,
controllerContext.RESTMapper,
).Run(ctx)
metaClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create metadata client for %s: %w", controllerName, err)
}
svmController := svm.NewSVMController(
ctx,
client,
dynamicClient,
informer,
controllerName,
controllerContext.RESTMapper,
controllerContext.GraphBuilder,
)
go svmController.Run(ctx)
return svmController, true, nil
return newControllerLoop(concurrentRun(
svm.NewSVMController(
ctx,
client,
dynamicClient,
informer,
controllerName,
controllerContext.RESTMapper,
controllerContext.GraphBuilder,
).Run,
svm.NewResourceVersionController(
ctx,
client,
discoveryClient,
metaClient,
informer,
controllerContext.RESTMapper,
).Run,
), controllerName), nil
}

View File

@@ -18,13 +18,13 @@ package app
import (
"context"
"fmt"
apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
pluginvalidatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/component-base/featuregate"
"k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/validatingadmissionpolicystatus"
"k8s.io/kubernetes/pkg/generated/openapi"
@@ -33,27 +33,40 @@ import (
func newValidatingAdmissionPolicyStatusControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ValidatingAdmissionPolicyStatusController,
initFunc: startValidatingAdmissionPolicyStatusController,
constructor: newValidatingAdmissionPolicyStatusController,
requiredFeatureGates: []featuregate.Feature{},
}
}
func startValidatingAdmissionPolicyStatusController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
// KCM won't start the controller without the feature gate set.
func newValidatingAdmissionPolicyStatusController(ctx context.Context, controllerContext ControllerContext, controllerName string) (Controller, error) {
discoveryClient, err := controllerContext.ClientBuilder.DiscoveryClient(names.ValidatingAdmissionPolicyStatusController)
if err != nil {
return nil, fmt.Errorf("failed to create discovery client for %s: %w", controllerName, err)
}
schemaResolver := resolver.NewDefinitionsSchemaResolver(openapi.GetOpenAPIDefinitions, k8sscheme.Scheme, apiextensionsscheme.Scheme).
Combine(&resolver.ClientDiscoveryResolver{Discovery: controllerContext.ClientBuilder.DiscoveryClientOrDie(names.ValidatingAdmissionPolicyStatusController)})
Combine(&resolver.ClientDiscoveryResolver{Discovery: discoveryClient})
typeChecker := &pluginvalidatingadmissionpolicy.TypeChecker{
SchemaResolver: schemaResolver,
RestMapper: controllerContext.RESTMapper,
}
client, err := controllerContext.NewClient(names.ValidatingAdmissionPolicyStatusController)
if err != nil {
return nil, err
}
c, err := validatingadmissionpolicystatus.NewController(
controllerContext.InformerFactory.Admissionregistration().V1().ValidatingAdmissionPolicies(),
controllerContext.ClientBuilder.ClientOrDie(names.ValidatingAdmissionPolicyStatusController).AdmissionregistrationV1().ValidatingAdmissionPolicies(),
client.AdmissionregistrationV1().ValidatingAdmissionPolicies(),
typeChecker,
)
if err != nil {
return nil, err
}
go c.Run(ctx, int(controllerContext.ComponentConfig.ValidatingAdmissionPolicyStatusController.ConcurrentPolicySyncs))
return nil, true, err
return newControllerLoop(func(ctx context.Context) {
c.Run(ctx, int(controllerContext.ComponentConfig.ValidatingAdmissionPolicyStatusController.ConcurrentPolicySyncs))
}, controllerName), nil
}