mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 19:23:40 +00:00
Encapsulate KCM controllers with their metadata
- These metadata can be used to handle controllers in a generic way. - This enables showing feature gated controllers in kube-controller-manager's help. - It is possible to obtain a controllerName in the InitFunc so it can be passed down to and used by the controller. metadata about a controller: - name - requiredFeatureGates - isDisabledByDefault - isCloudProviderController
This commit is contained in:
parent
fd5c406112
commit
27a77e0ef3
@ -27,13 +27,20 @@ import (
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||
"k8s.io/kubernetes/pkg/controller/deployment"
|
||||
"k8s.io/kubernetes/pkg/controller/replicaset"
|
||||
"k8s.io/kubernetes/pkg/controller/statefulset"
|
||||
)
|
||||
|
||||
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newDaemonSetControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.DaemonSetController,
|
||||
initFunc: startDaemonSetController,
|
||||
}
|
||||
}
|
||||
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
dsc, err := daemon.NewDaemonSetsController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
|
||||
@ -50,7 +57,13 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newStatefulSetControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.StatefulSetController,
|
||||
initFunc: startStatefulSetController,
|
||||
}
|
||||
}
|
||||
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go statefulset.NewStatefulSetController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
@ -62,7 +75,14 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newReplicaSetControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ReplicaSetController,
|
||||
initFunc: startReplicaSetController,
|
||||
}
|
||||
}
|
||||
|
||||
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go replicaset.NewReplicaSetController(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
|
||||
@ -73,7 +93,14 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newDeploymentControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.DeploymentController,
|
||||
initFunc: startDeploymentController,
|
||||
}
|
||||
}
|
||||
|
||||
func startDeploymentController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
dc, err := deployment.NewDeploymentController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Apps().V1().Deployments(),
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/scale"
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
||||
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@ -35,11 +36,14 @@ import (
|
||||
"k8s.io/metrics/pkg/client/external_metrics"
|
||||
)
|
||||
|
||||
func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
return startHPAControllerWithRESTClient(ctx, controllerContext)
|
||||
func newHorizontalPodAutoscalerControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.HorizontalPodAutoscalerController,
|
||||
initFunc: startHorizontalPodAutoscalerControllerWithRESTClient,
|
||||
}
|
||||
}
|
||||
|
||||
func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func startHorizontalPodAutoscalerControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
|
||||
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
|
||||
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
|
||||
|
@ -24,11 +24,19 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/cronjob"
|
||||
"k8s.io/kubernetes/pkg/controller/job"
|
||||
)
|
||||
|
||||
func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newJobControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.JobController,
|
||||
initFunc: startJobController,
|
||||
}
|
||||
}
|
||||
|
||||
func startJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
jobController, err := job.NewController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
@ -42,7 +50,14 @@ func startJobController(ctx context.Context, controllerContext ControllerContext
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newCronJobControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.CronJobController,
|
||||
initFunc: startCronJobController,
|
||||
}
|
||||
}
|
||||
|
||||
func startCronJobController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
|
||||
controllerContext.InformerFactory.Batch().V1().CronJobs(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
|
||||
|
@ -21,10 +21,18 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/bootstrap"
|
||||
)
|
||||
|
||||
func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newBootstrapSignerControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.BootstrapSignerController,
|
||||
initFunc: startBootstrapSignerController,
|
||||
isDisabledByDefault: true,
|
||||
}
|
||||
}
|
||||
func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
bsc, err := bootstrap.NewSigner(
|
||||
controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"),
|
||||
controllerContext.InformerFactory.Core().V1().Secrets(),
|
||||
@ -38,7 +46,14 @@ func startBootstrapSignerController(ctx context.Context, controllerContext Contr
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newTokenCleanerControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.TokenCleanerController,
|
||||
initFunc: startTokenCleanerController,
|
||||
isDisabledByDefault: true,
|
||||
}
|
||||
}
|
||||
func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
tcc, err := bootstrap.NewTokenCleaner(
|
||||
controllerContext.ClientBuilder.ClientOrDie("token-cleaner"),
|
||||
controllerContext.InformerFactory.Core().V1().Secrets(),
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/certificates/approver"
|
||||
"k8s.io/kubernetes/pkg/controller/certificates/cleaner"
|
||||
"k8s.io/kubernetes/pkg/controller/certificates/rootcacertpublisher"
|
||||
@ -32,7 +33,14 @@ import (
|
||||
csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config"
|
||||
)
|
||||
|
||||
func startCSRSigningController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newCertificateSigningRequestSigningControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.CertificateSigningRequestSigningController,
|
||||
initFunc: startCertificateSigningRequestSigningController,
|
||||
}
|
||||
}
|
||||
|
||||
func startCertificateSigningRequestSigningController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
|
||||
if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
|
||||
@ -148,7 +156,13 @@ func getLegacyUnknownSignerFiles(config csrsigningconfig.CSRSigningControllerCon
|
||||
return config.ClusterSigningCertFile, config.ClusterSigningKeyFile
|
||||
}
|
||||
|
||||
func startCSRApprovingController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newCertificateSigningRequestApprovingControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.CertificateSigningRequestApprovingController,
|
||||
initFunc: startCertificateSigningRequestApprovingController,
|
||||
}
|
||||
}
|
||||
func startCertificateSigningRequestApprovingController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
approver := approver.NewCSRApprovingController(
|
||||
ctx,
|
||||
controllerContext.ClientBuilder.ClientOrDie("certificate-controller"),
|
||||
@ -159,7 +173,13 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startCSRCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newCertificateSigningRequestCleanerControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.CertificateSigningRequestCleanerController,
|
||||
initFunc: startCertificateSigningRequestCleanerController,
|
||||
}
|
||||
}
|
||||
func startCertificateSigningRequestCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
cleaner := cleaner.NewCSRCleanerController(
|
||||
controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
|
||||
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
|
||||
@ -168,7 +188,14 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startRootCACertPublisher(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newRootCACertificatePublisherControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.RootCACertificatePublisherController,
|
||||
initFunc: startRootCACertificatePublisherController,
|
||||
}
|
||||
}
|
||||
|
||||
func startRootCACertificatePublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
var (
|
||||
rootCA []byte
|
||||
err error
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@ -35,7 +36,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/mux"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
@ -51,10 +51,10 @@ import (
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
"k8s.io/client-go/util/keyutil"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
cpnames "k8s.io/cloud-provider/names"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/component-base/cli/globalflag"
|
||||
"k8s.io/component-base/configz"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/component-base/logs"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
"k8s.io/component-base/metrics/features"
|
||||
@ -70,8 +70,6 @@ import (
|
||||
"k8s.io/controller-manager/pkg/informerfactory"
|
||||
"k8s.io/controller-manager/pkg/leadermigration"
|
||||
"k8s.io/klog/v2"
|
||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
@ -137,7 +135,7 @@ controller, and serviceaccounts controller.`,
|
||||
}
|
||||
cliflag.PrintFlags(cmd.Flags())
|
||||
|
||||
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List(), names.KCMControllerAliases())
|
||||
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault(), names.KCMControllerAliases())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -156,7 +154,7 @@ controller, and serviceaccounts controller.`,
|
||||
}
|
||||
|
||||
fs := cmd.Flags()
|
||||
namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List(), names.KCMControllerAliases())
|
||||
namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault(), names.KCMControllerAliases())
|
||||
verflag.AddFlags(namedFlagSets.FlagSet("global"))
|
||||
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
|
||||
registerLegacyGlobalFlags(namedFlagSets)
|
||||
@ -226,16 +224,16 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||
|
||||
clientBuilder, rootClientBuilder := createClientBuilders(logger, c)
|
||||
|
||||
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
|
||||
saTokenControllerDescriptor := newServiceAccountTokenControllerDescriptor(rootClientBuilder)
|
||||
|
||||
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
|
||||
run := func(ctx context.Context, startSATokenControllerDescriptor *ControllerDescriptor, controllerDescriptorsFunc ControllerDescriptorsFunc) {
|
||||
controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done())
|
||||
if err != nil {
|
||||
logger.Error(err, "Error building controller context")
|
||||
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||
}
|
||||
controllerInitializers := initializersFunc(controllerContext.LoopMode)
|
||||
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
|
||||
controllerDescriptors := controllerDescriptorsFunc()
|
||||
if err := StartControllers(ctx, controllerContext, startSATokenControllerDescriptor, controllerDescriptors, unsecuredMux, healthzHandler); err != nil {
|
||||
logger.Error(err, "Error starting controllers")
|
||||
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||
}
|
||||
@ -249,7 +247,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||
|
||||
// No leader election, run directly
|
||||
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
||||
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
|
||||
run(ctx, saTokenControllerDescriptor, NewControllerDescriptors)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -264,9 +262,6 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||
// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
|
||||
var leaderMigrator *leadermigration.LeaderMigrator = nil
|
||||
|
||||
// startSATokenController will be original saTokenControllerInitFunc if leader migration is not enabled.
|
||||
startSATokenController := saTokenControllerInitFunc
|
||||
|
||||
// If leader migration is enabled, create the LeaderMigrator and prepare for migration
|
||||
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
|
||||
logger.Info("starting leader migration")
|
||||
@ -274,11 +269,14 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
|
||||
"kube-controller-manager")
|
||||
|
||||
// Wrap saTokenControllerInitFunc to signal readiness for migration after starting
|
||||
// startSATokenControllerInit is the original InitFunc.
|
||||
startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc()
|
||||
|
||||
// Wrap saTokenControllerDescriptor to signal readiness for migration after starting
|
||||
// the controller.
|
||||
startSATokenController = func(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
saTokenControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
defer close(leaderMigrator.MigrationReady)
|
||||
return saTokenControllerInitFunc(ctx, controllerContext)
|
||||
return startSATokenControllerInit(ctx, controllerContext, controllerName)
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,14 +286,14 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
||||
leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
initializersFunc := NewControllerInitializers
|
||||
initializersFunc := NewControllerDescriptors
|
||||
if leaderMigrator != nil {
|
||||
// If leader migration is enabled, we should start only non-migrated controllers
|
||||
// for the main lock.
|
||||
initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
|
||||
initializersFunc = createFilteredControllerDescriptorsFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
|
||||
logger.Info("leader migration: starting main controllers.")
|
||||
}
|
||||
run(ctx, startSATokenController, initializersFunc)
|
||||
run(ctx, saTokenControllerDescriptor, initializersFunc)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
logger.Error(nil, "leaderelection lost")
|
||||
@ -319,7 +317,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||
OnStartedLeading: func(ctx context.Context) {
|
||||
logger.Info("leader migration: starting migrated controllers.")
|
||||
// DO NOT start saTokenController under migration lock
|
||||
run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
|
||||
run(ctx, nil, createFilteredControllerDescriptorsFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
logger.Error(nil, "migration leaderelection lost")
|
||||
@ -377,8 +375,12 @@ type ControllerContext struct {
|
||||
}
|
||||
|
||||
// IsControllerEnabled checks if the context's controllers enabled or not
|
||||
func (c ControllerContext) IsControllerEnabled(name string) bool {
|
||||
return genericcontrollermanager.IsControllerEnabled(name, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
|
||||
func (c ControllerContext) IsControllerEnabled(controllerDescriptor *ControllerDescriptor) bool {
|
||||
controllersDisabledByDefault := sets.NewString()
|
||||
if controllerDescriptor.IsDisabledByDefault() {
|
||||
controllersDisabledByDefault.Insert(controllerDescriptor.Name())
|
||||
}
|
||||
return genericcontrollermanager.IsControllerEnabled(controllerDescriptor.Name(), controllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
|
||||
}
|
||||
|
||||
// InitFunc is used to launch a particular controller. It returns a controller
|
||||
@ -388,101 +390,141 @@ func (c ControllerContext) IsControllerEnabled(name string) bool {
|
||||
// that requests no additional features from the controller manager.
|
||||
// Any error returned will cause the controller process to `Fatal`
|
||||
// The bool indicates whether the controller was enabled.
|
||||
type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)
|
||||
type InitFunc func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controller.Interface, enabled bool, err error)
|
||||
|
||||
// ControllerInitializersFunc is used to create a collection of initializers
|
||||
type ControllerDescriptor struct {
|
||||
name string
|
||||
initFunc InitFunc
|
||||
requiredFeatureGates []featuregate.Feature
|
||||
isDisabledByDefault bool
|
||||
isCloudProviderController bool
|
||||
}
|
||||
|
||||
func (r *ControllerDescriptor) Name() string {
|
||||
return r.name
|
||||
}
|
||||
|
||||
func (r *ControllerDescriptor) GetInitFunc() InitFunc {
|
||||
return r.initFunc
|
||||
}
|
||||
|
||||
func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature {
|
||||
return append([]featuregate.Feature(nil), r.requiredFeatureGates...)
|
||||
}
|
||||
|
||||
func (r *ControllerDescriptor) IsDisabledByDefault() bool {
|
||||
return r.isDisabledByDefault
|
||||
}
|
||||
|
||||
func (r *ControllerDescriptor) IsCloudProviderController() bool {
|
||||
return r.isCloudProviderController
|
||||
}
|
||||
|
||||
// ControllerDescriptorsFunc is used to create a collection of controller descriptors
|
||||
// given the loopMode.
|
||||
type ControllerInitializersFunc func(loopMode ControllerLoopMode) (initializers map[string]InitFunc)
|
||||
type ControllerDescriptorsFunc func() (initializers map[string]*ControllerDescriptor)
|
||||
|
||||
var _ ControllerInitializersFunc = NewControllerInitializers
|
||||
var _ ControllerDescriptorsFunc = NewControllerDescriptors
|
||||
|
||||
// KnownControllers returns all known controllers's name
|
||||
func KnownControllers() []string {
|
||||
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
|
||||
ret := sets.StringKeySet(NewControllerDescriptors())
|
||||
|
||||
// add "special" controllers that aren't initialized normally. These controllers cannot be initialized
|
||||
// using a normal function. The only known special case is the SA token controller which *must* be started
|
||||
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding
|
||||
// to this list.
|
||||
ret.Insert(
|
||||
names.ServiceAccountTokenController,
|
||||
newServiceAccountTokenControllerDescriptor(nil).Name(),
|
||||
)
|
||||
|
||||
return ret.List()
|
||||
}
|
||||
|
||||
// ControllersDisabledByDefault is the set of controllers which is disabled by default
|
||||
var ControllersDisabledByDefault = sets.NewString(
|
||||
names.BootstrapSignerController,
|
||||
names.TokenCleanerController,
|
||||
)
|
||||
func ControllersDisabledByDefault() []string {
|
||||
var controllersDisabledByDefault []string
|
||||
|
||||
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
|
||||
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
|
||||
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
|
||||
controllers := map[string]InitFunc{}
|
||||
for name, c := range NewControllerDescriptors() {
|
||||
if c.IsDisabledByDefault() {
|
||||
controllersDisabledByDefault = append(controllersDisabledByDefault, name)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(controllersDisabledByDefault)
|
||||
|
||||
return controllersDisabledByDefault
|
||||
}
|
||||
|
||||
// NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func)
|
||||
// paired to their ControllerDescriptor wrapper object that includes InitFunc.
|
||||
// This allows for structured downstream composition and subdivision.
|
||||
func NewControllerDescriptors() map[string]*ControllerDescriptor {
|
||||
controllers := map[string]*ControllerDescriptor{}
|
||||
|
||||
// All of the controllers must have unique names, or else we will explode.
|
||||
register := func(name string, fn InitFunc) {
|
||||
register := func(controllerDesc *ControllerDescriptor) {
|
||||
if controllerDesc == nil {
|
||||
panic("received nil controller for a registration")
|
||||
}
|
||||
name := controllerDesc.Name()
|
||||
if len(name) == 0 {
|
||||
panic("received controller without a name for a registration")
|
||||
}
|
||||
if _, found := controllers[name]; found {
|
||||
panic(fmt.Sprintf("controller name %q was registered twice", name))
|
||||
}
|
||||
controllers[name] = fn
|
||||
if controllerDesc.GetInitFunc() == nil {
|
||||
panic("received controller without an init function for a registration")
|
||||
}
|
||||
controllers[name] = controllerDesc
|
||||
}
|
||||
|
||||
register(names.EndpointsController, startEndpointController)
|
||||
register(names.EndpointSliceController, startEndpointSliceController)
|
||||
register(names.EndpointSliceMirroringController, startEndpointSliceMirroringController)
|
||||
register(names.ReplicationControllerController, startReplicationController)
|
||||
register(names.PodGarbageCollectorController, startPodGCController)
|
||||
register(names.ResourceQuotaController, startResourceQuotaController)
|
||||
register(names.NamespaceController, startNamespaceController)
|
||||
register(names.ServiceAccountController, startServiceAccountController)
|
||||
register(names.GarbageCollectorController, startGarbageCollectorController)
|
||||
register(names.DaemonSetController, startDaemonSetController)
|
||||
register(names.JobController, startJobController)
|
||||
register(names.DeploymentController, startDeploymentController)
|
||||
register(names.ReplicaSetController, startReplicaSetController)
|
||||
register(names.HorizontalPodAutoscalerController, startHPAController)
|
||||
register(names.DisruptionController, startDisruptionController)
|
||||
register(names.StatefulSetController, startStatefulSetController)
|
||||
register(names.CronJobController, startCronJobController)
|
||||
register(names.CertificateSigningRequestSigningController, startCSRSigningController)
|
||||
register(names.CertificateSigningRequestApprovingController, startCSRApprovingController)
|
||||
register(names.CertificateSigningRequestCleanerController, startCSRCleanerController)
|
||||
register(names.TTLController, startTTLController)
|
||||
register(names.BootstrapSignerController, startBootstrapSignerController)
|
||||
register(names.TokenCleanerController, startTokenCleanerController)
|
||||
register(names.NodeIpamController, startNodeIpamController)
|
||||
register(names.NodeLifecycleController, startNodeLifecycleController)
|
||||
if loopMode == IncludeCloudLoops {
|
||||
register(cpnames.ServiceLBController, startServiceController)
|
||||
register(cpnames.NodeRouteController, startRouteController)
|
||||
register(cpnames.CloudNodeLifecycleController, startCloudNodeLifecycleController)
|
||||
// TODO: persistent volume controllers into the IncludeCloudLoops only set.
|
||||
}
|
||||
register(names.PersistentVolumeBinderController, startPersistentVolumeBinderController)
|
||||
register(names.PersistentVolumeAttachDetachController, startAttachDetachController)
|
||||
register(names.PersistentVolumeExpanderController, startVolumeExpandController)
|
||||
register(names.ClusterRoleAggregationController, startClusterRoleAggregrationController)
|
||||
register(names.PersistentVolumeClaimProtectionController, startPVCProtectionController)
|
||||
register(names.PersistentVolumeProtectionController, startPVProtectionController)
|
||||
register(names.TTLAfterFinishedController, startTTLAfterFinishedController)
|
||||
register(names.RootCACertificatePublisherController, startRootCACertPublisher)
|
||||
register(names.EphemeralVolumeController, startEphemeralVolumeController)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
|
||||
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
|
||||
register(names.StorageVersionGarbageCollectorController, startStorageVersionGCController)
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
|
||||
register(names.ResourceClaimController, startResourceClaimController)
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LegacyServiceAccountTokenCleanUp) {
|
||||
register(names.LegacyServiceAccountTokenCleanerController, startLegacySATokenCleaner)
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidatingAdmissionPolicy) {
|
||||
register(names.ValidatingAdmissionPolicyStatusController, startValidatingAdmissionPolicyStatusController)
|
||||
}
|
||||
register(newEndpointsControllerDescriptor())
|
||||
register(newEndpointSliceControllerDescriptor())
|
||||
register(newEndpointSliceMirroringControllerDescriptor())
|
||||
register(newReplicationControllerDescriptor())
|
||||
register(newPodGarbageCollectorControllerDescriptor())
|
||||
register(newResourceQuotaControllerDescriptor())
|
||||
register(newNamespaceControllerDescriptor())
|
||||
register(newServiceAccountControllerDescriptor())
|
||||
register(newGarbageCollectorControllerDescriptor())
|
||||
register(newDaemonSetControllerDescriptor())
|
||||
register(newJobControllerDescriptor())
|
||||
register(newDeploymentControllerDescriptor())
|
||||
register(newReplicaSetControllerDescriptor())
|
||||
register(newHorizontalPodAutoscalerControllerDescriptor())
|
||||
register(newDisruptionControllerDescriptor())
|
||||
register(newStatefulSetControllerDescriptor())
|
||||
register(newCronJobControllerDescriptor())
|
||||
register(newCertificateSigningRequestSigningControllerDescriptor())
|
||||
register(newCertificateSigningRequestApprovingControllerDescriptor())
|
||||
register(newCertificateSigningRequestCleanerControllerDescriptor())
|
||||
register(newTTLControllerDescriptor())
|
||||
register(newBootstrapSignerControllerDescriptor())
|
||||
register(newTokenCleanerControllerDescriptor())
|
||||
register(newNodeIpamControllerDescriptor())
|
||||
register(newNodeLifecycleControllerDescriptor())
|
||||
|
||||
register(newServiceLBControllerDescriptor()) // cloud provider controller
|
||||
register(newNodeRouteControllerDescriptor()) // cloud provider controller
|
||||
register(newCloudNodeLifecycleControllerDescriptor()) // cloud provider controller
|
||||
// TODO: persistent volume controllers into the IncludeCloudLoops only set as a cloud provider controller.
|
||||
|
||||
register(newPersistentVolumeBinderControllerDescriptor())
|
||||
register(newPersistentVolumeAttachDetachControllerDescriptor())
|
||||
register(newPersistentVolumeExpanderControllerDescriptor())
|
||||
register(newClusterRoleAggregrationControllerDescriptor())
|
||||
register(newPersistentVolumeClaimProtectionControllerDescriptor())
|
||||
register(newPersistentVolumeProtectionControllerDescriptor())
|
||||
register(newTTLAfterFinishedControllerDescriptor())
|
||||
register(newRootCACertificatePublisherControllerDescriptor())
|
||||
register(newEphemeralVolumeControllerDescriptor())
|
||||
|
||||
// feature gated
|
||||
register(newStorageVersionGarbageCollectorControllerDescriptor())
|
||||
register(newResourceClaimControllerDescriptor())
|
||||
register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
|
||||
register(newValidatingAdmissionPolicyStatusControllerDescriptor())
|
||||
|
||||
return controllers
|
||||
}
|
||||
@ -542,15 +584,20 @@ func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, root
|
||||
}
|
||||
|
||||
// StartControllers starts a set of controllers with a specified ControllerContext
|
||||
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
|
||||
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenControllerDescriptor *ControllerDescriptor, controllerDescriptors map[string]*ControllerDescriptor,
|
||||
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
||||
// If this fails, just return here and fail since other controllers won't be able to get credentials.
|
||||
if startSATokenController != nil {
|
||||
if _, _, err := startSATokenController(ctx, controllerCtx); err != nil {
|
||||
return err
|
||||
if startSATokenControllerDescriptor != nil {
|
||||
if !controllerCtx.IsControllerEnabled(startSATokenControllerDescriptor) {
|
||||
logger.Info("Warning: controller is disabled", "controller", startSATokenControllerDescriptor.Name())
|
||||
} else {
|
||||
initFunc := startSATokenControllerDescriptor.GetInitFunc()
|
||||
if _, _, err := initFunc(ctx, controllerCtx, startSATokenControllerDescriptor.Name()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -571,8 +618,25 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star
|
||||
// so we cannot rely on it yet to add the name
|
||||
// - it allows distinguishing between log entries emitted by the controller
|
||||
// and those emitted for it - this is a bit debatable and could be revised.
|
||||
for controllerName, initFn := range controllers {
|
||||
if !controllerCtx.IsControllerEnabled(controllerName) {
|
||||
for controllerName, controllerDesc := range controllerDescriptors {
|
||||
disabledByFeatureGate := false
|
||||
for _, featureGate := range controllerDesc.GetRequiredFeatureGates() {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(featureGate) {
|
||||
disabledByFeatureGate = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if disabledByFeatureGate {
|
||||
logger.Info("Controller is disabled by a feature gate", "controller", controllerName, "requiredFeatureGates", controllerDesc.GetRequiredFeatureGates())
|
||||
continue
|
||||
}
|
||||
|
||||
if controllerDesc.IsCloudProviderController() && controllerCtx.LoopMode != IncludeCloudLoops {
|
||||
logger.Info("Skipping a cloud provider controller", "controller", controllerName, "loopMode", controllerCtx.LoopMode)
|
||||
continue
|
||||
}
|
||||
|
||||
if !controllerCtx.IsControllerEnabled(controllerDesc) {
|
||||
logger.Info("Warning: controller is disabled", "controller", controllerName)
|
||||
continue
|
||||
}
|
||||
@ -580,7 +644,9 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star
|
||||
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
logger.V(1).Info("Starting controller", "controller", controllerName)
|
||||
ctrl, started, err := initFn(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx)
|
||||
|
||||
initFunc := controllerDesc.GetInitFunc()
|
||||
ctrl, started, err := initFunc(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx, controllerName)
|
||||
if err != nil {
|
||||
logger.Error(err, "Error starting controller", "controller", controllerName)
|
||||
return err
|
||||
@ -618,20 +684,20 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star
|
||||
|
||||
// serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers.
|
||||
// It cannot use the "normal" client builder, so it tracks its own. It must also avoid being included in the "normal"
|
||||
// init map so that it can always run first.
|
||||
type serviceAccountTokenControllerStarter struct {
|
||||
rootClientBuilder clientbuilder.ControllerClientBuilder
|
||||
// ControllerDescriptor map so that it can always run first.
|
||||
func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ServiceAccountTokenController,
|
||||
initFunc: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
return startServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext, controllerName string, rootClientBuilder clientbuilder.ControllerClientBuilder) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if !controllerContext.IsControllerEnabled(names.ServiceAccountTokenController) {
|
||||
logger.Info("Warning: controller is disabled", "controller", names.ServiceAccountTokenController)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
||||
logger.Info("Controller is disabled because there is no private key", "controller", names.ServiceAccountTokenController)
|
||||
logger.Info("Controller is disabled because there is no private key", "controller", controllerName)
|
||||
return nil, false, nil
|
||||
}
|
||||
privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
|
||||
@ -645,7 +711,7 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
|
||||
return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err)
|
||||
}
|
||||
} else {
|
||||
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
|
||||
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
|
||||
}
|
||||
|
||||
tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey)
|
||||
@ -655,7 +721,7 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
|
||||
tokenController, err := serviceaccountcontroller.NewTokensController(
|
||||
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
|
||||
controllerContext.InformerFactory.Core().V1().Secrets(),
|
||||
c.rootClientBuilder.ClientOrDie("tokens-controller"),
|
||||
rootClientBuilder.ClientOrDie("tokens-controller"),
|
||||
serviceaccountcontroller.TokensControllerOptions{
|
||||
TokenGenerator: tokenGenerator,
|
||||
RootCA: rootCA,
|
||||
@ -737,16 +803,16 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// createInitializersFunc creates a initializersFunc that returns all initializer
|
||||
// createFilteredControllerDescriptorsFunc creates a controllerDescriptorsFunc that returns all controllerDescriptors
|
||||
// with expected as the result after filtering through filterFunc.
|
||||
func createInitializersFunc(filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) ControllerInitializersFunc {
|
||||
return func(loopMode ControllerLoopMode) map[string]InitFunc {
|
||||
initializers := make(map[string]InitFunc)
|
||||
for name, initializer := range NewControllerInitializers(loopMode) {
|
||||
func createFilteredControllerDescriptorsFunc(filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) ControllerDescriptorsFunc {
|
||||
return func() map[string]*ControllerDescriptor {
|
||||
controllerDescriptors := make(map[string]*ControllerDescriptor)
|
||||
for name, controllerDesc := range NewControllerDescriptors() {
|
||||
if filterFunc(name) == expected {
|
||||
initializers[name] = initializer
|
||||
controllerDescriptors[name] = controllerDesc
|
||||
}
|
||||
}
|
||||
return initializers
|
||||
return controllerDescriptors
|
||||
}
|
||||
}
|
||||
|
@ -83,6 +83,7 @@ func TestControllerNamesDeclaration(t *testing.T) {
|
||||
names.StorageVersionGarbageCollectorController,
|
||||
names.ResourceClaimController,
|
||||
names.LegacyServiceAccountTokenCleanerController,
|
||||
names.ValidatingAdmissionPolicyStatusController,
|
||||
)
|
||||
|
||||
for _, name := range KnownControllers() {
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/quota/v1/generic"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -39,8 +40,11 @@ import (
|
||||
cloudnodelifecyclecontroller "k8s.io/cloud-provider/controllers/nodelifecycle"
|
||||
routecontroller "k8s.io/cloud-provider/controllers/route"
|
||||
servicecontroller "k8s.io/cloud-provider/controllers/service"
|
||||
cpnames "k8s.io/cloud-provider/names"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/controller-manager/controller"
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
pkgcontroller "k8s.io/kubernetes/pkg/controller"
|
||||
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
@ -63,6 +67,7 @@ import (
|
||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/pvprotection"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
|
||||
"k8s.io/kubernetes/pkg/volume/csimigration"
|
||||
"k8s.io/utils/clock"
|
||||
@ -76,7 +81,15 @@ const (
|
||||
defaultNodeMaskCIDRIPv6 = 64
|
||||
)
|
||||
|
||||
func startServiceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newServiceLBControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: cpnames.ServiceLBController,
|
||||
initFunc: startServiceLBController,
|
||||
isCloudProviderController: true,
|
||||
}
|
||||
}
|
||||
|
||||
func startServiceLBController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
serviceController, err := servicecontroller.New(
|
||||
controllerContext.Cloud,
|
||||
controllerContext.ClientBuilder.ClientOrDie("service-controller"),
|
||||
@ -93,8 +106,14 @@ func startServiceController(ctx context.Context, controllerContext ControllerCon
|
||||
go serviceController.Run(ctx, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs), controllerContext.ControllerManagerMetrics)
|
||||
return nil, true, nil
|
||||
}
|
||||
func newNodeIpamControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.NodeIpamController,
|
||||
initFunc: startNodeIpamController,
|
||||
}
|
||||
}
|
||||
|
||||
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
var serviceCIDR *net.IPNet
|
||||
var secondaryServiceCIDR *net.IPNet
|
||||
logger := klog.FromContext(ctx)
|
||||
@ -166,7 +185,14 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newNodeLifecycleControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.NodeLifecycleController,
|
||||
initFunc: startNodeLifecycleController,
|
||||
}
|
||||
}
|
||||
|
||||
func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Coordination().V1().Leases(),
|
||||
@ -190,7 +216,15 @@ func startNodeLifecycleController(ctx context.Context, controllerContext Control
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newCloudNodeLifecycleControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: cpnames.CloudNodeLifecycleController,
|
||||
initFunc: startCloudNodeLifecycleController,
|
||||
isCloudProviderController: true,
|
||||
}
|
||||
}
|
||||
|
||||
func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
@ -210,7 +244,15 @@ func startCloudNodeLifecycleController(ctx context.Context, controllerContext Co
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startRouteController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newNodeRouteControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: cpnames.NodeRouteController,
|
||||
initFunc: startNodeRouteController,
|
||||
isCloudProviderController: true,
|
||||
}
|
||||
}
|
||||
|
||||
func startNodeRouteController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||
logger.Info("Will not configure cloud provider routes for allocate-node-cidrs", "CIDRs", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, "routes", controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||
@ -240,7 +282,14 @@ func startRouteController(ctx context.Context, controllerContext ControllerConte
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newPersistentVolumeBinderControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PersistentVolumeBinderController,
|
||||
initFunc: startPersistentVolumeBinderController,
|
||||
}
|
||||
}
|
||||
|
||||
func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
plugins, err := ProbeControllerVolumePlugins(logger, controllerContext.Cloud, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||
if err != nil {
|
||||
@ -268,7 +317,14 @@ func startPersistentVolumeBinderController(ctx context.Context, controllerContex
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newPersistentVolumeAttachDetachControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PersistentVolumeAttachDetachController,
|
||||
initFunc: startPersistentVolumeAttachDetachController,
|
||||
}
|
||||
}
|
||||
|
||||
func startPersistentVolumeAttachDetachController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes()
|
||||
csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers()
|
||||
@ -304,7 +360,14 @@ func startAttachDetachController(ctx context.Context, controllerContext Controll
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startVolumeExpandController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newPersistentVolumeExpanderControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PersistentVolumeExpanderController,
|
||||
initFunc: startPersistentVolumeExpanderController,
|
||||
}
|
||||
}
|
||||
|
||||
func startPersistentVolumeExpanderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
plugins, err := ProbeExpandableVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||
if err != nil {
|
||||
@ -326,10 +389,16 @@ func startVolumeExpandController(ctx context.Context, controllerContext Controll
|
||||
}
|
||||
go expandController.Run(ctx)
|
||||
return nil, true, nil
|
||||
|
||||
}
|
||||
|
||||
func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newEphemeralVolumeControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.EphemeralVolumeController,
|
||||
initFunc: startEphemeralVolumeController,
|
||||
}
|
||||
}
|
||||
|
||||
func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
ephemeralController, err := ephemeral.NewController(
|
||||
controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
@ -343,7 +412,17 @@ func startEphemeralVolumeController(ctx context.Context, controllerContext Contr
|
||||
|
||||
const defaultResourceClaimControllerWorkers = 10
|
||||
|
||||
func startResourceClaimController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newResourceClaimControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ResourceClaimController,
|
||||
initFunc: startResourceClaimController,
|
||||
requiredFeatureGates: []featuregate.Feature{
|
||||
features.DynamicResourceAllocation,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func startResourceClaimController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
ephemeralController, err := resourceclaim.NewController(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"),
|
||||
@ -358,18 +437,32 @@ func startResourceClaimController(ctx context.Context, controllerContext Control
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startEndpointController(ctx context.Context, controllerCtx ControllerContext) (controller.Interface, bool, error) {
|
||||
func newEndpointsControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.EndpointsController,
|
||||
initFunc: startEndpointsController,
|
||||
}
|
||||
}
|
||||
|
||||
func startEndpointsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go endpointcontroller.NewEndpointController(
|
||||
controllerCtx.InformerFactory.Core().V1().Pods(),
|
||||
controllerCtx.InformerFactory.Core().V1().Services(),
|
||||
controllerCtx.InformerFactory.Core().V1().Endpoints(),
|
||||
controllerCtx.ClientBuilder.ClientOrDie("endpoint-controller"),
|
||||
controllerCtx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
|
||||
).Run(ctx, int(controllerCtx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Core().V1().Services(),
|
||||
controllerContext.InformerFactory.Core().V1().Endpoints(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("endpoint-controller"),
|
||||
controllerContext.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
|
||||
).Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newReplicationControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ReplicationControllerController,
|
||||
initFunc: startReplicationController,
|
||||
}
|
||||
}
|
||||
|
||||
func startReplicationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go replicationcontroller.NewReplicationManager(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
@ -380,7 +473,14 @@ func startReplicationController(ctx context.Context, controllerContext Controlle
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPodGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newPodGarbageCollectorControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PodGarbageCollectorController,
|
||||
initFunc: startPodGarbageCollectorController,
|
||||
}
|
||||
}
|
||||
|
||||
func startPodGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go podgc.NewPodGC(
|
||||
ctx,
|
||||
controllerContext.ClientBuilder.ClientOrDie("pod-garbage-collector"),
|
||||
@ -391,7 +491,14 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newResourceQuotaControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ResourceQuotaController,
|
||||
initFunc: startResourceQuotaController,
|
||||
}
|
||||
}
|
||||
|
||||
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
||||
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
|
||||
@ -422,7 +529,14 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startNamespaceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newNamespaceControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.NamespaceController,
|
||||
initFunc: startNamespaceController,
|
||||
}
|
||||
}
|
||||
|
||||
func startNamespaceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
|
||||
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
|
||||
// including events), takes ~10 seconds by default.
|
||||
@ -456,7 +570,14 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startServiceAccountController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newServiceAccountControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ServiceAccountController,
|
||||
initFunc: startServiceAccountController,
|
||||
}
|
||||
}
|
||||
|
||||
func startServiceAccountController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
sac, err := serviceaccountcontroller.NewServiceAccountsController(
|
||||
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
|
||||
controllerContext.InformerFactory.Core().V1().Namespaces(),
|
||||
@ -470,7 +591,14 @@ func startServiceAccountController(ctx context.Context, controllerContext Contro
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newTTLControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.TTLController,
|
||||
initFunc: startTTLController,
|
||||
}
|
||||
}
|
||||
|
||||
func startTTLController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go ttlcontroller.NewTTLController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||
@ -479,7 +607,14 @@ func startTTLController(ctx context.Context, controllerContext ControllerContext
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newGarbageCollectorControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.GarbageCollectorController,
|
||||
initFunc: startGarbageCollectorController,
|
||||
}
|
||||
}
|
||||
|
||||
func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
|
||||
return nil, false, nil
|
||||
}
|
||||
@ -523,7 +658,14 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
|
||||
return garbageCollector, true, nil
|
||||
}
|
||||
|
||||
func startPVCProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newPersistentVolumeClaimProtectionControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PersistentVolumeClaimProtectionController,
|
||||
initFunc: startPersistentVolumeClaimProtectionController,
|
||||
}
|
||||
}
|
||||
|
||||
func startPersistentVolumeClaimProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
@ -537,7 +679,14 @@ func startPVCProtectionController(ctx context.Context, controllerContext Control
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPVProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newPersistentVolumeProtectionControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.PersistentVolumeProtectionController,
|
||||
initFunc: startPersistentVolumeProtectionController,
|
||||
}
|
||||
}
|
||||
|
||||
func startPersistentVolumeProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go pvprotection.NewPVProtectionController(
|
||||
klog.FromContext(ctx),
|
||||
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
@ -546,7 +695,14 @@ func startPVProtectionController(ctx context.Context, controllerContext Controll
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newTTLAfterFinishedControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.TTLAfterFinishedController,
|
||||
initFunc: startTTLAfterFinishedController,
|
||||
}
|
||||
}
|
||||
|
||||
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go ttlafterfinished.New(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Batch().V1().Jobs(),
|
||||
@ -555,7 +711,17 @@ func startTTLAfterFinishedController(ctx context.Context, controllerContext Cont
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startLegacySATokenCleaner(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newLegacyServiceAccountTokenCleanerControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.LegacyServiceAccountTokenCleanerController,
|
||||
initFunc: startLegacyServiceAccountTokenCleanerController,
|
||||
requiredFeatureGates: []featuregate.Feature{
|
||||
features.LegacyServiceAccountTokenCleanUp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func startLegacyServiceAccountTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
cleanUpPeriod := controllerContext.ComponentConfig.LegacySATokenCleaner.CleanUpPeriod.Duration
|
||||
legacySATokenCleaner, err := serviceaccountcontroller.NewLegacySATokenCleaner(
|
||||
controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
|
||||
@ -690,7 +856,18 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl
|
||||
return sortedSizes(ipv4Mask, ipv6Mask), nil
|
||||
}
|
||||
|
||||
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newStorageVersionGarbageCollectorControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.StorageVersionGarbageCollectorController,
|
||||
initFunc: startStorageVersionGarbageCollectorController,
|
||||
requiredFeatureGates: []featuregate.Feature{
|
||||
genericfeatures.APIServerIdentity,
|
||||
genericfeatures.StorageVersionAPI,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func startStorageVersionGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go storageversiongc.NewStorageVersionGC(
|
||||
ctx,
|
||||
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
fakeclientset "k8s.io/client-go/kubernetes/fake"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/controller-manager/controller"
|
||||
)
|
||||
|
||||
// TestClientBuilder inherits ClientBuilder and can accept a given fake clientset.
|
||||
@ -105,15 +104,13 @@ func possibleDiscoveryResource() []*metav1.APIResourceList {
|
||||
}
|
||||
}
|
||||
|
||||
type controllerInitFunc func(context.Context, ControllerContext) (controller.Interface, bool, error)
|
||||
|
||||
func TestController_DiscoveryError(t *testing.T) {
|
||||
controllerInitFuncMap := map[string]controllerInitFunc{
|
||||
"ResourceQuotaController": startResourceQuotaController,
|
||||
"GarbageCollectorController": startGarbageCollectorController,
|
||||
"EndpointSliceController": startEndpointSliceController,
|
||||
"EndpointSliceMirroringController": startEndpointSliceMirroringController,
|
||||
"PodDisruptionBudgetController": startDisruptionController,
|
||||
controllerDescriptorMap := map[string]*ControllerDescriptor{
|
||||
"ResourceQuotaController": newResourceQuotaControllerDescriptor(),
|
||||
"GarbageCollectorController": newGarbageCollectorControllerDescriptor(),
|
||||
"EndpointSliceController": newEndpointSliceControllerDescriptor(),
|
||||
"EndpointSliceMirroringController": newEndpointSliceMirroringControllerDescriptor(),
|
||||
"PodDisruptionBudgetController": newDisruptionControllerDescriptor(),
|
||||
}
|
||||
|
||||
tcs := map[string]struct {
|
||||
@ -143,10 +140,10 @@ func TestController_DiscoveryError(t *testing.T) {
|
||||
ObjectOrMetadataInformerFactory: testInformerFactory,
|
||||
InformersStarted: make(chan struct{}),
|
||||
}
|
||||
for funcName, controllerInit := range controllerInitFuncMap {
|
||||
_, _, err := controllerInit(context.TODO(), ctx)
|
||||
for controllerName, controllerDesc := range controllerDescriptorMap {
|
||||
_, _, err := controllerDesc.GetInitFunc()(context.TODO(), ctx, controllerName)
|
||||
if test.expectedErr != (err != nil) {
|
||||
t.Errorf("%v test failed for use case: %v", funcName, name)
|
||||
t.Errorf("%v test failed for use case: %v", controllerName, name)
|
||||
}
|
||||
}
|
||||
_, _, err := startModifiedNamespaceController(
|
||||
|
@ -23,11 +23,19 @@ import (
|
||||
"context"
|
||||
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice"
|
||||
endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
|
||||
)
|
||||
|
||||
func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newEndpointSliceControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.EndpointSliceController,
|
||||
initFunc: startEndpointSliceController,
|
||||
}
|
||||
}
|
||||
|
||||
func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go endpointslicecontroller.NewController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
@ -41,7 +49,14 @@ func startEndpointSliceController(ctx context.Context, controllerContext Control
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newEndpointSliceMirroringControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.EndpointSliceMirroringController,
|
||||
initFunc: startEndpointSliceMirroringController,
|
||||
}
|
||||
}
|
||||
|
||||
func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go endpointslicemirroringcontroller.NewController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Core().V1().Endpoints(),
|
||||
|
@ -25,10 +25,18 @@ import (
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/scale"
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/disruption"
|
||||
)
|
||||
|
||||
func startDisruptionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newDisruptionControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.DisruptionController,
|
||||
initFunc: startDisruptionController,
|
||||
}
|
||||
}
|
||||
|
||||
func startDisruptionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
client := controllerContext.ClientBuilder.ClientOrDie("disruption-controller")
|
||||
config := controllerContext.ClientBuilder.ConfigOrDie("disruption-controller")
|
||||
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
|
||||
|
@ -20,10 +20,18 @@ import (
|
||||
"context"
|
||||
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
|
||||
)
|
||||
|
||||
func startClusterRoleAggregrationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newClusterRoleAggregrationControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ClusterRoleAggregationController,
|
||||
initFunc: startClusterRoleAggregationController,
|
||||
}
|
||||
}
|
||||
|
||||
func startClusterRoleAggregationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
go clusterroleaggregation.NewClusterRoleAggregation(
|
||||
controllerContext.InformerFactory.Rbac().V1().ClusterRoles(),
|
||||
controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
|
||||
|
@ -97,7 +97,7 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
|
||||
if err != nil {
|
||||
return TestServer{}, err
|
||||
}
|
||||
all, disabled, aliases := app.KnownControllers(), app.ControllersDisabledByDefault.List(), names.KCMControllerAliases()
|
||||
all, disabled, aliases := app.KnownControllers(), app.ControllersDisabledByDefault(), names.KCMControllerAliases()
|
||||
namedFlagSets := s.Flags(all, disabled, aliases)
|
||||
for _, f := range namedFlagSets.FlagSets {
|
||||
fs.AddFlagSet(f)
|
||||
|
@ -21,14 +21,26 @@ import (
|
||||
|
||||
pluginvalidatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
|
||||
"k8s.io/apiserver/pkg/cel/openapi/resolver"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/controller-manager/controller"
|
||||
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
|
||||
"k8s.io/kubernetes/pkg/controller/validatingadmissionpolicystatus"
|
||||
"k8s.io/kubernetes/pkg/generated/openapi"
|
||||
)
|
||||
|
||||
func startValidatingAdmissionPolicyStatusController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
func newValidatingAdmissionPolicyStatusControllerDescriptor() *ControllerDescriptor {
|
||||
return &ControllerDescriptor{
|
||||
name: names.ValidatingAdmissionPolicyStatusController,
|
||||
initFunc: startValidatingAdmissionPolicyStatusController,
|
||||
requiredFeatureGates: []featuregate.Feature{
|
||||
genericfeatures.ValidatingAdmissionPolicy,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func startValidatingAdmissionPolicyStatusController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
|
||||
// KCM won't start the controller without the feature gate set.
|
||||
typeChecker := &pluginvalidatingadmissionpolicy.TypeChecker{
|
||||
SchemaResolver: resolver.NewDefinitionsSchemaResolver(scheme.Scheme, openapi.GetOpenAPIDefinitions),
|
||||
|
@ -33,20 +33,17 @@ import cpnames "k8s.io/cloud-provider/names"
|
||||
//
|
||||
// USE CASES
|
||||
// The following places should use the controller name constants, when:
|
||||
// 1. registering a controller in app.NewControllerInitializers or app.KnownControllers:
|
||||
// 1.1. disabling a controller by default in app.ControllersDisabledByDefault
|
||||
// 1.2. checking if IsControllerEnabled
|
||||
// 1.3. defining an alias in KCMControllerAliases (for backwards compatibility only)
|
||||
// 2. used anywhere inside the controller itself:
|
||||
// 2.1. [TODO] logger component should be configured with the controller name by calling LoggerWithName
|
||||
// 2.2. [TODO] logging should use a canonical controller name when referencing a controller (Eg. Starting X, Shutting down X)
|
||||
// 2.3. [TODO] emitted events should have an EventSource.Component set to the controller name (usually when initializing an EventRecorder)
|
||||
// 2.4. [TODO] registering ControllerManagerMetrics with ControllerStarted and ControllerStopped
|
||||
// 2.5. [TODO] calling WaitForNamedCacheSync
|
||||
// 3. defining controller options for "--help" command or generated documentation
|
||||
// 3.1. controller name should be used to create a pflag.FlagSet when registering controller options (the name is rendered in a controller flag group header)
|
||||
// 3.2. when defined flag's help mentions a controller name
|
||||
// 4. defining a new service account for a new controller (old controllers may have inconsistent service accounts to stay backwards compatible)
|
||||
// 1. defining a new app.ControllerDescriptor so it can be used in app.NewControllerDescriptors or app.KnownControllers:
|
||||
// 2. defining an alias in KCMControllerAliases (for backwards compatibility only)
|
||||
// 3. used anywhere inside the controller itself:
|
||||
// 3.1. [TODO] logging should use a canonical controller name when referencing a controller (Eg. Starting X, Shutting down X)
|
||||
// 3.2. [TODO] emitted events should have an EventSource.Component set to the controller name (usually when initializing an EventRecorder)
|
||||
// 3.3. [TODO] registering ControllerManagerMetrics with ControllerStarted and ControllerStopped
|
||||
// 3.4. [TODO] calling WaitForNamedCacheSync
|
||||
// 4. defining controller options for "--help" command or generated documentation
|
||||
// 1.1. controller name should be used to create a pflag.FlagSet when registering controller options (the name is rendered in a controller flag group header) in options.KubeControllerManagerOptions
|
||||
// 1.2. when defined flag's help mentions a controller name
|
||||
// 5. defining a new service account for a new controller (old controllers may have inconsistent service accounts to stay backwards compatible)
|
||||
const (
|
||||
ServiceAccountTokenController = "serviceaccount-token-controller"
|
||||
EndpointsController = "endpoints-controller"
|
||||
|
Loading…
Reference in New Issue
Block a user