mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
Merge pull request #116529 from pohly/controllers-with-name
kube-controller-manager: convert to structured logging
This commit is contained in:
commit
27e23bad7d
@ -34,7 +34,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "daemonset-controller"))
|
|
||||||
dsc, err := daemon.NewDaemonSetsController(
|
dsc, err := daemon.NewDaemonSetsController(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
|
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
|
||||||
@ -52,7 +51,6 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "statefulset"))
|
|
||||||
go statefulset.NewStatefulSetController(
|
go statefulset.NewStatefulSetController(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||||
@ -76,7 +74,6 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "deployment"))
|
|
||||||
dc, err := deployment.NewDeploymentController(
|
dc, err := deployment.NewDeploymentController(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.InformerFactory.Apps().V1().Deployments(),
|
controllerContext.InformerFactory.Apps().V1().Deployments(),
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/client-go/scale"
|
"k8s.io/client-go/scale"
|
||||||
"k8s.io/controller-manager/controller"
|
"k8s.io/controller-manager/controller"
|
||||||
"k8s.io/klog/v2"
|
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
@ -38,9 +37,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "hpa-controller"))
|
|
||||||
|
|
||||||
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
|
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
@ -23,8 +23,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"k8s.io/controller-manager/controller"
|
"k8s.io/controller-manager/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/cronjob"
|
"k8s.io/kubernetes/pkg/controller/cronjob"
|
||||||
"k8s.io/kubernetes/pkg/controller/job"
|
"k8s.io/kubernetes/pkg/controller/job"
|
||||||
@ -40,7 +38,6 @@ func startJobController(ctx context.Context, controllerContext ControllerContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "cronjob-controller"))
|
|
||||||
cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
|
cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
|
||||||
controllerContext.InformerFactory.Batch().V1().CronJobs(),
|
controllerContext.InformerFactory.Batch().V1().CronJobs(),
|
||||||
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
|
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
|
||||||
|
@ -20,14 +20,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"k8s.io/controller-manager/controller"
|
"k8s.io/controller-manager/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/bootstrap"
|
"k8s.io/kubernetes/pkg/controller/bootstrap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startBootstrapSignerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "bootstrap-signer-controller"))
|
|
||||||
bsc, err := bootstrap.NewSigner(
|
bsc, err := bootstrap.NewSigner(
|
||||||
controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"),
|
controllerContext.ClientBuilder.ClientOrDie("bootstrap-signer"),
|
||||||
controllerContext.InformerFactory.Core().V1().Secrets(),
|
controllerContext.InformerFactory.Core().V1().Secrets(),
|
||||||
@ -42,7 +39,6 @@ func startBootstrapSignerController(ctx context.Context, controllerContext Contr
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startTokenCleanerController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "token-cleaner-controller"))
|
|
||||||
tcc, err := bootstrap.NewTokenCleaner(
|
tcc, err := bootstrap.NewTokenCleaner(
|
||||||
controllerContext.ClientBuilder.ClientOrDie("token-cleaner"),
|
controllerContext.ClientBuilder.ClientOrDie("token-cleaner"),
|
||||||
controllerContext.InformerFactory.Core().V1().Secrets(),
|
controllerContext.InformerFactory.Core().V1().Secrets(),
|
||||||
|
@ -33,9 +33,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func startCSRSigningController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startCSRSigningController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
|
missingSingleSigningFile := controllerContext.ComponentConfig.CSRSigningController.ClusterSigningCertFile == "" || controllerContext.ComponentConfig.CSRSigningController.ClusterSigningKeyFile == ""
|
||||||
if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
|
if missingSingleSigningFile && !anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
|
||||||
klog.V(2).Info("skipping CSR signer controller because no csr cert/key was specified")
|
logger.Info("Skipping CSR signer controller because no csr cert/key was specified")
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
if !missingSingleSigningFile && anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
|
if !missingSingleSigningFile && anySpecificFilesSet(controllerContext.ComponentConfig.CSRSigningController) {
|
||||||
@ -53,7 +54,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
|
|||||||
}
|
}
|
||||||
go kubeletServingSigner.Run(ctx, 5)
|
go kubeletServingSigner.Run(ctx, 5)
|
||||||
} else {
|
} else {
|
||||||
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving")
|
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kubelet-serving")
|
||||||
}
|
}
|
||||||
|
|
||||||
if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 {
|
if kubeletClientSignerCertFile, kubeletClientSignerKeyFile := getKubeletClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeletClientSignerCertFile) > 0 || len(kubeletClientSignerKeyFile) > 0 {
|
||||||
@ -63,7 +64,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
|
|||||||
}
|
}
|
||||||
go kubeletClientSigner.Run(ctx, 5)
|
go kubeletClientSigner.Run(ctx, 5)
|
||||||
} else {
|
} else {
|
||||||
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet")
|
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kube-apiserver-client-kubelet")
|
||||||
}
|
}
|
||||||
|
|
||||||
if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 {
|
if kubeAPIServerSignerCertFile, kubeAPIServerSignerKeyFile := getKubeAPIServerClientSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(kubeAPIServerSignerCertFile) > 0 || len(kubeAPIServerSignerKeyFile) > 0 {
|
||||||
@ -73,7 +74,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
|
|||||||
}
|
}
|
||||||
go kubeAPIServerClientSigner.Run(ctx, 5)
|
go kubeAPIServerClientSigner.Run(ctx, 5)
|
||||||
} else {
|
} else {
|
||||||
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client")
|
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/kube-apiserver-client")
|
||||||
}
|
}
|
||||||
|
|
||||||
if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 {
|
if legacyUnknownSignerCertFile, legacyUnknownSignerKeyFile := getLegacyUnknownSignerFiles(controllerContext.ComponentConfig.CSRSigningController); len(legacyUnknownSignerCertFile) > 0 || len(legacyUnknownSignerKeyFile) > 0 {
|
||||||
@ -83,7 +84,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
|
|||||||
}
|
}
|
||||||
go legacyUnknownSigner.Run(ctx, 5)
|
go legacyUnknownSigner.Run(ctx, 5)
|
||||||
} else {
|
} else {
|
||||||
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown")
|
logger.Info("Skipping CSR signer controller because specific files were specified for other signers and not this one", "controller", "kubernetes.io/legacy-unknown")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
|
|
||||||
// createCloudProvider helps consolidate what is needed for cloud providers, we explicitly list the things
|
// createCloudProvider helps consolidate what is needed for cloud providers, we explicitly list the things
|
||||||
// that the cloud providers need as parameters, so we can control
|
// that the cloud providers need as parameters, so we can control
|
||||||
func createCloudProvider(cloudProvider string, externalCloudVolumePlugin string, cloudConfigFile string,
|
func createCloudProvider(logger klog.Logger, cloudProvider string, externalCloudVolumePlugin string, cloudConfigFile string,
|
||||||
allowUntaggedCloud bool, sharedInformers informers.SharedInformerFactory) (cloudprovider.Interface, ControllerLoopMode, error) {
|
allowUntaggedCloud bool, sharedInformers informers.SharedInformerFactory) (cloudprovider.Interface, ControllerLoopMode, error) {
|
||||||
var cloud cloudprovider.Interface
|
var cloud cloudprovider.Interface
|
||||||
var loopMode ControllerLoopMode
|
var loopMode ControllerLoopMode
|
||||||
@ -62,7 +62,7 @@ func createCloudProvider(cloudProvider string, externalCloudVolumePlugin string,
|
|||||||
|
|
||||||
if cloud != nil && !cloud.HasClusterID() {
|
if cloud != nil && !cloud.HasClusterID() {
|
||||||
if allowUntaggedCloud {
|
if allowUntaggedCloud {
|
||||||
klog.Warning("detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
|
logger.Info("Warning: detected a cluster without a ClusterID. A ClusterID will be required in the future. Please tag your cluster to avoid any future issues")
|
||||||
} else {
|
} else {
|
||||||
return nil, loopMode, fmt.Errorf("no ClusterID Found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
|
return nil, loopMode, fmt.Errorf("no ClusterID Found. A ClusterID is required for the cloud provider to function properly. This check can be bypassed by setting the allow-untagged-cloud option")
|
||||||
}
|
}
|
||||||
|
@ -104,7 +104,8 @@ const (
|
|||||||
func NewControllerManagerCommand() *cobra.Command {
|
func NewControllerManagerCommand() *cobra.Command {
|
||||||
s, err := options.NewKubeControllerManagerOptions()
|
s, err := options.NewKubeControllerManagerOptions()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("unable to initialize command options: %v", err)
|
klog.Background().Error(err, "Unable to initialize command options")
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
@ -140,7 +141,7 @@ controller, and serviceaccounts controller.`,
|
|||||||
}
|
}
|
||||||
// add feature enablement metrics
|
// add feature enablement metrics
|
||||||
utilfeature.DefaultMutableFeatureGate.AddMetrics()
|
utilfeature.DefaultMutableFeatureGate.AddMetrics()
|
||||||
return Run(c.Complete(), wait.NeverStop)
|
return Run(context.Background(), c.Complete())
|
||||||
},
|
},
|
||||||
Args: func(cmd *cobra.Command, args []string) error {
|
Args: func(cmd *cobra.Command, args []string) error {
|
||||||
for _, arg := range args {
|
for _, arg := range args {
|
||||||
@ -178,11 +179,14 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run runs the KubeControllerManagerOptions.
|
// Run runs the KubeControllerManagerOptions.
|
||||||
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
func Run(ctx context.Context, c *config.CompletedConfig) error {
|
||||||
// To help debugging, immediately log version
|
logger := klog.FromContext(ctx)
|
||||||
klog.Infof("Version: %+v", version.Get())
|
stopCh := ctx.Done()
|
||||||
|
|
||||||
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
|
// To help debugging, immediately log version
|
||||||
|
logger.Info("Starting", "version", version.Get())
|
||||||
|
|
||||||
|
logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
|
||||||
|
|
||||||
// Start events processing pipeline.
|
// Start events processing pipeline.
|
||||||
c.EventBroadcaster.StartStructuredLogging(0)
|
c.EventBroadcaster.StartStructuredLogging(0)
|
||||||
@ -192,7 +196,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
if cfgz, err := configz.New(ConfigzName); err == nil {
|
if cfgz, err := configz.New(ConfigzName); err == nil {
|
||||||
cfgz.Set(c.ComponentConfig)
|
cfgz.Set(c.ComponentConfig)
|
||||||
} else {
|
} else {
|
||||||
klog.Errorf("unable to register configz: %v", err)
|
logger.Error(err, "Unable to register configz")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup any healthz checks we will want to use.
|
// Setup any healthz checks we will want to use.
|
||||||
@ -219,18 +223,20 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
clientBuilder, rootClientBuilder := createClientBuilders(c)
|
clientBuilder, rootClientBuilder := createClientBuilders(logger, c)
|
||||||
|
|
||||||
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
|
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
|
||||||
|
|
||||||
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
|
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
|
||||||
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
|
controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("error building controller context: %v", err)
|
logger.Error(err, "Error building controller context")
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
}
|
}
|
||||||
controllerInitializers := initializersFunc(controllerContext.LoopMode)
|
controllerInitializers := initializersFunc(controllerContext.LoopMode)
|
||||||
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
|
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
|
||||||
klog.Fatalf("error starting controllers: %v", err)
|
logger.Error(err, "Error starting controllers")
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
controllerContext.InformerFactory.Start(stopCh)
|
controllerContext.InformerFactory.Start(stopCh)
|
||||||
@ -242,7 +248,6 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
|
|
||||||
// No leader election, run directly
|
// No leader election, run directly
|
||||||
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
||||||
ctx := wait.ContextForChannel(stopCh)
|
|
||||||
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
|
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -263,7 +268,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
|
|
||||||
// If leader migration is enabled, create the LeaderMigrator and prepare for migration
|
// If leader migration is enabled, create the LeaderMigrator and prepare for migration
|
||||||
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
|
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
|
||||||
klog.Infof("starting leader migration")
|
logger.Info("starting leader migration")
|
||||||
|
|
||||||
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
|
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
|
||||||
"kube-controller-manager")
|
"kube-controller-manager")
|
||||||
@ -277,7 +282,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the main lock
|
// Start the main lock
|
||||||
go leaderElectAndRun(c, id, electionChecker,
|
go leaderElectAndRun(ctx, c, id, electionChecker,
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
|
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
||||||
leaderelection.LeaderCallbacks{
|
leaderelection.LeaderCallbacks{
|
||||||
@ -287,12 +292,12 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
// If leader migration is enabled, we should start only non-migrated controllers
|
// If leader migration is enabled, we should start only non-migrated controllers
|
||||||
// for the main lock.
|
// for the main lock.
|
||||||
initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
|
initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
|
||||||
klog.Info("leader migration: starting main controllers.")
|
logger.Info("leader migration: starting main controllers.")
|
||||||
}
|
}
|
||||||
run(ctx, startSATokenController, initializersFunc)
|
run(ctx, startSATokenController, initializersFunc)
|
||||||
},
|
},
|
||||||
OnStoppedLeading: func() {
|
OnStoppedLeading: func() {
|
||||||
klog.ErrorS(nil, "leaderelection lost")
|
logger.Error(nil, "leaderelection lost")
|
||||||
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -306,17 +311,17 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
<-leaderMigrator.MigrationReady
|
<-leaderMigrator.MigrationReady
|
||||||
|
|
||||||
// Start the migration lock.
|
// Start the migration lock.
|
||||||
go leaderElectAndRun(c, id, electionChecker,
|
go leaderElectAndRun(ctx, c, id, electionChecker,
|
||||||
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
|
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
|
||||||
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
|
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
|
||||||
leaderelection.LeaderCallbacks{
|
leaderelection.LeaderCallbacks{
|
||||||
OnStartedLeading: func(ctx context.Context) {
|
OnStartedLeading: func(ctx context.Context) {
|
||||||
klog.Info("leader migration: starting migrated controllers.")
|
logger.Info("leader migration: starting migrated controllers.")
|
||||||
// DO NOT start saTokenController under migration lock
|
// DO NOT start saTokenController under migration lock
|
||||||
run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
|
run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
|
||||||
},
|
},
|
||||||
OnStoppedLeading: func() {
|
OnStoppedLeading: func() {
|
||||||
klog.ErrorS(nil, "migration leaderelection lost")
|
logger.Error(nil, "migration leaderelection lost")
|
||||||
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -514,7 +519,7 @@ func GetAvailableResources(clientBuilder clientbuilder.ControllerClientBuilder)
|
|||||||
// CreateControllerContext creates a context struct containing references to resources needed by the
|
// CreateControllerContext creates a context struct containing references to resources needed by the
|
||||||
// controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
|
// controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
|
||||||
// the shared-informers client and token controller.
|
// the shared-informers client and token controller.
|
||||||
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
|
func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
|
||||||
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
|
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
|
||||||
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
|
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
|
||||||
|
|
||||||
@ -540,7 +545,7 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
|
|||||||
return ControllerContext{}, err
|
return ControllerContext{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cloud, loopMode, err := createCloudProvider(s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
|
cloud, loopMode, err := createCloudProvider(logger, s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
|
||||||
s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
|
s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ControllerContext{}, err
|
return ControllerContext{}, err
|
||||||
@ -566,6 +571,8 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
|
|||||||
// StartControllers starts a set of controllers with a specified ControllerContext
|
// StartControllers starts a set of controllers with a specified ControllerContext
|
||||||
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
|
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
|
||||||
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
|
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
||||||
// If this fails, just return here and fail since other controllers won't be able to get credentials.
|
// If this fails, just return here and fail since other controllers won't be able to get credentials.
|
||||||
if startSATokenController != nil {
|
if startSATokenController != nil {
|
||||||
@ -582,22 +589,31 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star
|
|||||||
|
|
||||||
var controllerChecks []healthz.HealthChecker
|
var controllerChecks []healthz.HealthChecker
|
||||||
|
|
||||||
|
// Each controller is passed a context where the logger has the name of
|
||||||
|
// the controller set through WithName. That name then becomes the prefix of
|
||||||
|
// of all log messages emitted by that controller.
|
||||||
|
//
|
||||||
|
// In this loop, an explicit "controller" key is used instead, for two reasons:
|
||||||
|
// - while contextual logging is alpha, klog.LoggerWithName is still a no-op,
|
||||||
|
// so we cannot rely on it yet to add the name
|
||||||
|
// - it allows distinguishing between log entries emitted by the controller
|
||||||
|
// and those emitted for it - this is a bit debatable and could be revised.
|
||||||
for controllerName, initFn := range controllers {
|
for controllerName, initFn := range controllers {
|
||||||
if !controllerCtx.IsControllerEnabled(controllerName) {
|
if !controllerCtx.IsControllerEnabled(controllerName) {
|
||||||
klog.Warningf("%q is disabled", controllerName)
|
logger.Info("Warning: controller is disabled", "controller", controllerName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
|
||||||
klog.V(1).Infof("Starting %q", controllerName)
|
logger.V(1).Info("Starting controller", "controller", controllerName)
|
||||||
ctrl, started, err := initFn(ctx, controllerCtx)
|
ctrl, started, err := initFn(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Error starting %q", controllerName)
|
logger.Error(err, "Error starting controller", "controller", controllerName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !started {
|
if !started {
|
||||||
klog.Warningf("Skipping %q", controllerName)
|
logger.Info("Warning: skipping controller", "controller", controllerName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
check := controllerhealthz.NamedPingChecker(controllerName)
|
check := controllerhealthz.NamedPingChecker(controllerName)
|
||||||
@ -619,7 +635,7 @@ func StartControllers(ctx context.Context, controllerCtx ControllerContext, star
|
|||||||
}
|
}
|
||||||
controllerChecks = append(controllerChecks, check)
|
controllerChecks = append(controllerChecks, check)
|
||||||
|
|
||||||
klog.Infof("Started %q", controllerName)
|
logger.Info("Started controller", "controller", controllerName)
|
||||||
}
|
}
|
||||||
|
|
||||||
healthzHandler.AddHealthChecker(controllerChecks...)
|
healthzHandler.AddHealthChecker(controllerChecks...)
|
||||||
@ -635,13 +651,14 @@ type serviceAccountTokenControllerStarter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
if !controllerContext.IsControllerEnabled(saTokenControllerName) {
|
if !controllerContext.IsControllerEnabled(saTokenControllerName) {
|
||||||
klog.Warningf("%q is disabled", saTokenControllerName)
|
logger.Info("Warning: controller is disabled", "controller", saTokenControllerName)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
||||||
klog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
|
logger.Info("Controller is disabled because there is no private key", "controller", saTokenControllerName)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
|
privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
|
||||||
@ -695,7 +712,7 @@ func readCA(file string) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createClientBuilders creates clientBuilder and rootClientBuilder from the given configuration
|
// createClientBuilders creates clientBuilder and rootClientBuilder from the given configuration
|
||||||
func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilder.ControllerClientBuilder, rootClientBuilder clientbuilder.ControllerClientBuilder) {
|
func createClientBuilders(logger klog.Logger, c *config.CompletedConfig) (clientBuilder clientbuilder.ControllerClientBuilder, rootClientBuilder clientbuilder.ControllerClientBuilder) {
|
||||||
rootClientBuilder = clientbuilder.SimpleControllerClientBuilder{
|
rootClientBuilder = clientbuilder.SimpleControllerClientBuilder{
|
||||||
ClientConfig: c.Kubeconfig,
|
ClientConfig: c.Kubeconfig,
|
||||||
}
|
}
|
||||||
@ -703,7 +720,7 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde
|
|||||||
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
||||||
// It's possible another controller process is creating the tokens for us.
|
// It's possible another controller process is creating the tokens for us.
|
||||||
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
|
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
|
||||||
klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
|
logger.Info("Warning: --use-service-account-credentials was specified without providing a --service-account-private-key-file")
|
||||||
}
|
}
|
||||||
|
|
||||||
clientBuilder = clientbuilder.NewDynamicClientBuilder(
|
clientBuilder = clientbuilder.NewDynamicClientBuilder(
|
||||||
@ -718,7 +735,8 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde
|
|||||||
|
|
||||||
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
|
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
|
||||||
// TODO: extract this function into staging/controller-manager
|
// TODO: extract this function into staging/controller-manager
|
||||||
func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
|
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
|
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
|
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
|
||||||
leaseName,
|
leaseName,
|
||||||
@ -729,10 +747,11 @@ func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionC
|
|||||||
c.Kubeconfig,
|
c.Kubeconfig,
|
||||||
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("error creating lock: %v", err)
|
logger.Error(err, "Error creating lock")
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
|
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
|
||||||
Lock: rl,
|
Lock: rl,
|
||||||
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
|
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
|
||||||
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
|
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
|
||||||
|
@ -89,7 +89,7 @@ func startServiceController(ctx context.Context, controllerContext ControllerCon
|
|||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This error shouldn't fail. It lives like this as a legacy.
|
// This error shouldn't fail. It lives like this as a legacy.
|
||||||
klog.Errorf("Failed to start service controller: %v", err)
|
klog.FromContext(ctx).Error(err, "Failed to start service controller")
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
go serviceController.Run(ctx, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs), controllerContext.ControllerManagerMetrics)
|
go serviceController.Run(ctx, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs), controllerContext.ControllerManagerMetrics)
|
||||||
@ -99,6 +99,7 @@ func startServiceController(ctx context.Context, controllerContext ControllerCon
|
|||||||
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startNodeIpamController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
var serviceCIDR *net.IPNet
|
var serviceCIDR *net.IPNet
|
||||||
var secondaryServiceCIDR *net.IPNet
|
var secondaryServiceCIDR *net.IPNet
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
// should we start nodeIPAM
|
// should we start nodeIPAM
|
||||||
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
|
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
|
||||||
@ -119,14 +120,14 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo
|
|||||||
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
|
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
|
||||||
_, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)
|
_, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR, err)
|
logger.Info("Warning: unsuccessful parsing of service CIDR", "CIDR", controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR, "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
|
if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
|
||||||
_, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
|
_, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err)
|
logger.Info("Warning: unsuccessful parsing of service CIDR", "CIDR", controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +155,6 @@ func startNodeIpamController(ctx context.Context, controllerContext ControllerCo
|
|||||||
clusterCIDRInformer = controllerContext.InformerFactory.Networking().V1alpha1().ClusterCIDRs()
|
clusterCIDRInformer = controllerContext.InformerFactory.Networking().V1alpha1().ClusterCIDRs()
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "NodeIpamController"))
|
|
||||||
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
|
nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||||
@ -199,6 +199,7 @@ func startNodeLifecycleController(ctx context.Context, controllerContext Control
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
|
cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
|
||||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||||
// cloud node lifecycle controller uses existing cluster role from node-controller
|
// cloud node lifecycle controller uses existing cluster role from node-controller
|
||||||
@ -209,7 +210,7 @@ func startCloudNodeLifecycleController(ctx context.Context, controllerContext Co
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// the controller manager should continue to run if the "Instances" interface is not
|
// the controller manager should continue to run if the "Instances" interface is not
|
||||||
// supported, though it's unlikely for a cloud provider to not support it
|
// supported, though it's unlikely for a cloud provider to not support it
|
||||||
klog.Errorf("failed to start cloud node lifecycle controller: %v", err)
|
logger.Error(err, "Failed to start cloud node lifecycle controller")
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,17 +219,18 @@ func startCloudNodeLifecycleController(ctx context.Context, controllerContext Co
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startRouteController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startRouteController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||||
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
logger.Info("Will not configure cloud provider routes for allocate-node-cidrs", "CIDRs", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, "routes", controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
if controllerContext.Cloud == nil {
|
if controllerContext.Cloud == nil {
|
||||||
klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
|
logger.Info("Warning: configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
routes, ok := controllerContext.Cloud.Routes()
|
routes, ok := controllerContext.Cloud.Routes()
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
logger.Info("Warning: configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,7 +249,8 @@ func startRouteController(ctx context.Context, controllerContext ControllerConte
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
plugins, err := ProbeControllerVolumePlugins(controllerContext.Cloud, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
logger := klog.FromContext(ctx)
|
||||||
|
plugins, err := ProbeControllerVolumePlugins(logger, controllerContext.Cloud, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
|
return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
|
||||||
}
|
}
|
||||||
@ -271,7 +274,6 @@ func startPersistentVolumeBinderController(ctx context.Context, controllerContex
|
|||||||
EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
|
EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
|
||||||
FilteredDialOptions: filteredDialOptions,
|
FilteredDialOptions: filteredDialOptions,
|
||||||
}
|
}
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "persistentvolume-binder-controller"))
|
|
||||||
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(ctx, params)
|
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(ctx, params)
|
||||||
if volumeControllerErr != nil {
|
if volumeControllerErr != nil {
|
||||||
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
|
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
|
||||||
@ -281,10 +283,11 @@ func startPersistentVolumeBinderController(ctx context.Context, controllerContex
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startAttachDetachController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes()
|
csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes()
|
||||||
csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers()
|
csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers()
|
||||||
|
|
||||||
plugins, err := ProbeAttachableVolumePlugins()
|
plugins, err := ProbeAttachableVolumePlugins(logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %v", err)
|
return nil, true, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %v", err)
|
||||||
}
|
}
|
||||||
@ -296,7 +299,6 @@ func startAttachDetachController(ctx context.Context, controllerContext Controll
|
|||||||
return nil, true, err
|
return nil, true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := klog.LoggerWithName(klog.FromContext(ctx), "attachdetach-controller")
|
|
||||||
ctx = klog.NewContext(ctx, logger)
|
ctx = klog.NewContext(ctx, logger)
|
||||||
attachDetachController, attachDetachControllerErr :=
|
attachDetachController, attachDetachControllerErr :=
|
||||||
attachdetach.NewAttachDetachController(
|
attachdetach.NewAttachDetachController(
|
||||||
@ -325,7 +327,8 @@ func startAttachDetachController(ctx context.Context, controllerContext Controll
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startVolumeExpandController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startVolumeExpandController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
plugins, err := ProbeExpandableVolumePlugins(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
logger := klog.FromContext(ctx)
|
||||||
|
plugins, err := ProbeExpandableVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err)
|
return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err)
|
||||||
}
|
}
|
||||||
@ -350,14 +353,12 @@ func startVolumeExpandController(ctx context.Context, controllerContext Controll
|
|||||||
if expandControllerErr != nil {
|
if expandControllerErr != nil {
|
||||||
return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
|
return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
|
||||||
}
|
}
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "persistentvolume-expander-controller"))
|
|
||||||
go expandController.Run(ctx)
|
go expandController.Run(ctx)
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "ephemeral-volume-controller"))
|
|
||||||
ephemeralController, err := ephemeral.NewController(
|
ephemeralController, err := ephemeral.NewController(
|
||||||
controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
|
controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
|
||||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||||
@ -417,7 +418,6 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "resourcequota-controller"))
|
|
||||||
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
|
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||||
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
||||||
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
|
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
|
||||||
@ -468,7 +468,6 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con
|
|||||||
|
|
||||||
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
|
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
|
||||||
|
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "namespace"))
|
|
||||||
namespaceController := namespacecontroller.NewNamespaceController(
|
namespaceController := namespacecontroller.NewNamespaceController(
|
||||||
ctx,
|
ctx,
|
||||||
namespaceKubeClient,
|
namespaceKubeClient,
|
||||||
@ -498,7 +497,6 @@ func startServiceAccountController(ctx context.Context, controllerContext Contro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "ttl"))
|
|
||||||
go ttlcontroller.NewTTLController(
|
go ttlcontroller.NewTTLController(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.InformerFactory.Core().V1().Nodes(),
|
controllerContext.InformerFactory.Core().V1().Nodes(),
|
||||||
@ -512,8 +510,6 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
|
|||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "garbagecollector"))
|
|
||||||
|
|
||||||
gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
|
||||||
discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
|
discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
|
||||||
|
|
||||||
@ -554,7 +550,6 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startPVCProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startPVCProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "persistentvolumeclaim-protection-controller"))
|
|
||||||
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
|
pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
|
||||||
klog.FromContext(ctx),
|
klog.FromContext(ctx),
|
||||||
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||||
@ -569,7 +564,6 @@ func startPVCProtectionController(ctx context.Context, controllerContext Control
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startPVProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startPVProtectionController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "persistentvolume-protection-controller"))
|
|
||||||
go pvprotection.NewPVProtectionController(
|
go pvprotection.NewPVProtectionController(
|
||||||
klog.FromContext(ctx),
|
klog.FromContext(ctx),
|
||||||
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
|
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
|
||||||
@ -579,7 +573,6 @@ func startPVProtectionController(ctx context.Context, controllerContext Controll
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "ttlafterfinished"))
|
|
||||||
go ttlafterfinished.New(
|
go ttlafterfinished.New(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.InformerFactory.Batch().V1().Jobs(),
|
controllerContext.InformerFactory.Batch().V1().Jobs(),
|
||||||
@ -705,7 +698,6 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "storageVersionGC"))
|
|
||||||
go storageversiongc.NewStorageVersionGC(
|
go storageversiongc.NewStorageVersionGC(
|
||||||
ctx,
|
ctx,
|
||||||
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
||||||
|
@ -49,10 +49,10 @@ import (
|
|||||||
// detach controller.
|
// detach controller.
|
||||||
// The list of plugins is manually compiled. This code and the plugin
|
// The list of plugins is manually compiled. This code and the plugin
|
||||||
// initialization code for kubelet really, really need a through refactor.
|
// initialization code for kubelet really, really need a through refactor.
|
||||||
func ProbeAttachableVolumePlugins() ([]volume.VolumePlugin, error) {
|
func ProbeAttachableVolumePlugins(logger klog.Logger) ([]volume.VolumePlugin, error) {
|
||||||
var err error
|
var err error
|
||||||
allPlugins := []volume.VolumePlugin{}
|
allPlugins := []volume.VolumePlugin{}
|
||||||
allPlugins, err = appendAttachableLegacyProviderVolumes(allPlugins, utilfeature.DefaultFeatureGate)
|
allPlugins, err = appendAttachableLegacyProviderVolumes(logger, allPlugins, utilfeature.DefaultFeatureGate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return allPlugins, err
|
return allPlugins, err
|
||||||
}
|
}
|
||||||
@ -70,10 +70,10 @@ func GetDynamicPluginProber(config persistentvolumeconfig.VolumeConfiguration) v
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ProbeExpandableVolumePlugins returns volume plugins which are expandable
|
// ProbeExpandableVolumePlugins returns volume plugins which are expandable
|
||||||
func ProbeExpandableVolumePlugins(config persistentvolumeconfig.VolumeConfiguration) ([]volume.VolumePlugin, error) {
|
func ProbeExpandableVolumePlugins(logger klog.Logger, config persistentvolumeconfig.VolumeConfiguration) ([]volume.VolumePlugin, error) {
|
||||||
var err error
|
var err error
|
||||||
allPlugins := []volume.VolumePlugin{}
|
allPlugins := []volume.VolumePlugin{}
|
||||||
allPlugins, err = appendExpandableLegacyProviderVolumes(allPlugins, utilfeature.DefaultFeatureGate)
|
allPlugins, err = appendExpandableLegacyProviderVolumes(logger, allPlugins, utilfeature.DefaultFeatureGate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return allPlugins, err
|
return allPlugins, err
|
||||||
}
|
}
|
||||||
@ -84,7 +84,7 @@ func ProbeExpandableVolumePlugins(config persistentvolumeconfig.VolumeConfigurat
|
|||||||
// ProbeControllerVolumePlugins collects all persistent volume plugins into an
|
// ProbeControllerVolumePlugins collects all persistent volume plugins into an
|
||||||
// easy to use list. Only volume plugins that implement any of
|
// easy to use list. Only volume plugins that implement any of
|
||||||
// provisioner/recycler/deleter interface should be returned.
|
// provisioner/recycler/deleter interface should be returned.
|
||||||
func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config persistentvolumeconfig.VolumeConfiguration) ([]volume.VolumePlugin, error) {
|
func ProbeControllerVolumePlugins(logger klog.Logger, cloud cloudprovider.Interface, config persistentvolumeconfig.VolumeConfiguration) ([]volume.VolumePlugin, error) {
|
||||||
allPlugins := []volume.VolumePlugin{}
|
allPlugins := []volume.VolumePlugin{}
|
||||||
|
|
||||||
// The list of plugins to probe is decided by this binary, not
|
// The list of plugins to probe is decided by this binary, not
|
||||||
@ -103,7 +103,8 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config persiste
|
|||||||
ProvisioningEnabled: config.EnableHostPathProvisioning,
|
ProvisioningEnabled: config.EnableHostPathProvisioning,
|
||||||
}
|
}
|
||||||
if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, &hostPathConfig); err != nil {
|
if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, &hostPathConfig); err != nil {
|
||||||
klog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath, err)
|
logger.Error(err, "Could not create hostpath recycler pod from file", "path", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathHostPath)
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
}
|
}
|
||||||
allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(hostPathConfig)...)
|
allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(hostPathConfig)...)
|
||||||
|
|
||||||
@ -113,12 +114,13 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config persiste
|
|||||||
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
|
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
|
||||||
}
|
}
|
||||||
if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, &nfsConfig); err != nil {
|
if err := AttemptToLoadRecycler(config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, &nfsConfig); err != nil {
|
||||||
klog.Fatalf("Could not create NFS recycler pod from file %s: %+v", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS, err)
|
logger.Error(err, "Could not create NFS recycler pod from file", "path", config.PersistentVolumeRecyclerConfiguration.PodTemplateFilePathNFS)
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
}
|
}
|
||||||
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
|
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
allPlugins, err = appendExpandableLegacyProviderVolumes(allPlugins, utilfeature.DefaultFeatureGate)
|
allPlugins, err = appendExpandableLegacyProviderVolumes(logger, allPlugins, utilfeature.DefaultFeatureGate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return allPlugins, err
|
return allPlugins, err
|
||||||
}
|
}
|
||||||
|
@ -21,21 +21,22 @@ package app
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/component-base/featuregate"
|
"k8s.io/component-base/featuregate"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
)
|
)
|
||||||
|
|
||||||
func appendAttachableLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
func appendAttachableLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
||||||
// no-op when compiled without legacy cloud providers
|
// no-op when compiled without legacy cloud providers
|
||||||
return allPlugins, nil
|
return allPlugins, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendExpandableLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
func appendExpandableLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
||||||
// no-op when compiled without legacy cloud providers
|
// no-op when compiled without legacy cloud providers
|
||||||
return allPlugins, nil
|
return allPlugins, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
func appendLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
||||||
// no-op when compiled without legacy cloud providers
|
// no-op when compiled without legacy cloud providers
|
||||||
return allPlugins, nil
|
return allPlugins, nil
|
||||||
}
|
}
|
||||||
|
@ -35,18 +35,19 @@ import (
|
|||||||
|
|
||||||
type probeFn func() []volume.VolumePlugin
|
type probeFn func() []volume.VolumePlugin
|
||||||
|
|
||||||
func appendPluginBasedOnFeatureFlags(plugins []volume.VolumePlugin, inTreePluginName string, featureGate featuregate.FeatureGate, pluginInfo pluginInfo) ([]volume.VolumePlugin, error) {
|
func appendPluginBasedOnFeatureFlags(logger klog.Logger, plugins []volume.VolumePlugin, inTreePluginName string, featureGate featuregate.FeatureGate, pluginInfo pluginInfo) ([]volume.VolumePlugin, error) {
|
||||||
|
|
||||||
_, err := csimigration.CheckMigrationFeatureFlags(featureGate, pluginInfo.pluginMigrationFeature, pluginInfo.pluginUnregisterFeature)
|
_, err := csimigration.CheckMigrationFeatureFlags(featureGate, pluginInfo.pluginMigrationFeature, pluginInfo.pluginUnregisterFeature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Unexpected CSI Migration Feature Flags combination detected: %v. CSI Migration may not take effect", err)
|
logger.Error(err, "Unexpected CSI Migration Feature Flags combination detected. CSI Migration may not take effect")
|
||||||
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
// TODO: fail and return here once alpha only tests can set the feature flags for a plugin correctly
|
// TODO: fail and return here once alpha only tests can set the feature flags for a plugin correctly
|
||||||
}
|
}
|
||||||
|
|
||||||
// Skip appending the in-tree plugin to the list of plugins to be probed/initialized
|
// Skip appending the in-tree plugin to the list of plugins to be probed/initialized
|
||||||
// if the plugin unregister feature flag is set
|
// if the plugin unregister feature flag is set
|
||||||
if featureGate.Enabled(pluginInfo.pluginUnregisterFeature) {
|
if featureGate.Enabled(pluginInfo.pluginUnregisterFeature) {
|
||||||
klog.Infof("Skip registration of plugin %s since feature flag %v is enabled", inTreePluginName, pluginInfo.pluginUnregisterFeature)
|
logger.Info("Skip registration of plugin since feature flag is enabled", "plugin", inTreePluginName, "feature", pluginInfo.pluginUnregisterFeature)
|
||||||
return plugins, nil
|
return plugins, nil
|
||||||
}
|
}
|
||||||
plugins = append(plugins, pluginInfo.pluginProbeFunction()...)
|
plugins = append(plugins, pluginInfo.pluginProbeFunction()...)
|
||||||
@ -59,7 +60,7 @@ type pluginInfo struct {
|
|||||||
pluginProbeFunction probeFn
|
pluginProbeFunction probeFn
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendAttachableLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
func appendAttachableLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
||||||
pluginMigrationStatus := make(map[string]pluginInfo)
|
pluginMigrationStatus := make(map[string]pluginInfo)
|
||||||
pluginMigrationStatus[plugins.GCEPDInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationGCE, pluginUnregisterFeature: features.InTreePluginGCEUnregister, pluginProbeFunction: gcepd.ProbeVolumePlugins}
|
pluginMigrationStatus[plugins.GCEPDInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationGCE, pluginUnregisterFeature: features.InTreePluginGCEUnregister, pluginProbeFunction: gcepd.ProbeVolumePlugins}
|
||||||
pluginMigrationStatus[plugins.VSphereInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationvSphere, pluginUnregisterFeature: features.InTreePluginvSphereUnregister, pluginProbeFunction: vsphere_volume.ProbeVolumePlugins}
|
pluginMigrationStatus[plugins.VSphereInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationvSphere, pluginUnregisterFeature: features.InTreePluginvSphereUnregister, pluginProbeFunction: vsphere_volume.ProbeVolumePlugins}
|
||||||
@ -67,7 +68,7 @@ func appendAttachableLegacyProviderVolumes(allPlugins []volume.VolumePlugin, fea
|
|||||||
pluginMigrationStatus[plugins.RBDVolumePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationRBD, pluginUnregisterFeature: features.InTreePluginRBDUnregister, pluginProbeFunction: rbd.ProbeVolumePlugins}
|
pluginMigrationStatus[plugins.RBDVolumePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationRBD, pluginUnregisterFeature: features.InTreePluginRBDUnregister, pluginProbeFunction: rbd.ProbeVolumePlugins}
|
||||||
var err error
|
var err error
|
||||||
for pluginName, pluginInfo := range pluginMigrationStatus {
|
for pluginName, pluginInfo := range pluginMigrationStatus {
|
||||||
allPlugins, err = appendPluginBasedOnFeatureFlags(allPlugins, pluginName, featureGate, pluginInfo)
|
allPlugins, err = appendPluginBasedOnFeatureFlags(logger, allPlugins, pluginName, featureGate, pluginInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return allPlugins, err
|
return allPlugins, err
|
||||||
}
|
}
|
||||||
@ -75,14 +76,14 @@ func appendAttachableLegacyProviderVolumes(allPlugins []volume.VolumePlugin, fea
|
|||||||
return allPlugins, nil
|
return allPlugins, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendExpandableLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
func appendExpandableLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
||||||
return appendLegacyProviderVolumes(allPlugins, featureGate)
|
return appendLegacyProviderVolumes(logger, allPlugins, featureGate)
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
func appendLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
|
||||||
var err error
|
var err error
|
||||||
// First append attachable volumes
|
// First append attachable volumes
|
||||||
allPlugins, err = appendAttachableLegacyProviderVolumes(allPlugins, featureGate)
|
allPlugins, err = appendAttachableLegacyProviderVolumes(logger, allPlugins, featureGate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return allPlugins, err
|
return allPlugins, err
|
||||||
}
|
}
|
||||||
@ -94,7 +95,7 @@ func appendLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate f
|
|||||||
pluginUnregisterFeature: features.InTreePluginAzureFileUnregister,
|
pluginUnregisterFeature: features.InTreePluginAzureFileUnregister,
|
||||||
pluginProbeFunction: azure_file.ProbeVolumePlugins,
|
pluginProbeFunction: azure_file.ProbeVolumePlugins,
|
||||||
}
|
}
|
||||||
allPlugins, err = appendPluginBasedOnFeatureFlags(allPlugins, pluginName, featureGate, pluginInfo)
|
allPlugins, err = appendPluginBasedOnFeatureFlags(logger, allPlugins, pluginName, featureGate, pluginInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return allPlugins, err
|
return allPlugins, err
|
||||||
}
|
}
|
||||||
|
@ -21,12 +21,10 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/controller-manager/controller"
|
"k8s.io/controller-manager/controller"
|
||||||
"k8s.io/klog/v2"
|
|
||||||
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
|
"k8s.io/kubernetes/pkg/controller/clusterroleaggregation"
|
||||||
)
|
)
|
||||||
|
|
||||||
func startClusterRoleAggregrationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startClusterRoleAggregrationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "clusterrole-aggregation-controller"))
|
|
||||||
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] {
|
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] {
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
@ -55,17 +55,17 @@ type TestServer struct {
|
|||||||
// enough time to remove temporary files.
|
// enough time to remove temporary files.
|
||||||
func StartTestServer(ctx context.Context, customFlags []string) (result TestServer, err error) {
|
func StartTestServer(ctx context.Context, customFlags []string) (result TestServer, err error) {
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
stopCh := make(chan struct{})
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
var errCh chan error
|
var errCh chan error
|
||||||
tearDown := func() {
|
tearDown := func() {
|
||||||
close(stopCh)
|
cancel()
|
||||||
|
|
||||||
// If the kube-controller-manager was started, let's wait for
|
// If the kube-controller-manager was started, let's wait for
|
||||||
// it to shutdown clearly.
|
// it to shutdown cleanly.
|
||||||
if errCh != nil {
|
if errCh != nil {
|
||||||
err, ok := <-errCh
|
err, ok := <-errCh
|
||||||
if ok && err != nil {
|
if ok && err != nil {
|
||||||
klog.Errorf("Failed to shutdown test server clearly: %v", err)
|
logger.Error(err, "Failed to shutdown test server cleanly")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(result.TmpDir) != 0 {
|
if len(result.TmpDir) != 0 {
|
||||||
@ -112,21 +112,23 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
|
|||||||
}
|
}
|
||||||
|
|
||||||
errCh = make(chan error)
|
errCh = make(chan error)
|
||||||
go func(stopCh <-chan struct{}) {
|
go func(ctx context.Context) {
|
||||||
defer close(errCh)
|
defer close(errCh)
|
||||||
|
|
||||||
if err := app.Run(config.Complete(), stopCh); err != nil {
|
if err := app.Run(ctx, config.Complete()); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
}
|
}
|
||||||
}(stopCh)
|
}(ctx)
|
||||||
|
|
||||||
logger.Info("Waiting for /healthz to be ok...")
|
logger.Info("Waiting for /healthz to be ok...")
|
||||||
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
|
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, fmt.Errorf("failed to create a client: %v", err)
|
return result, fmt.Errorf("failed to create a client: %v", err)
|
||||||
}
|
}
|
||||||
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
|
err = wait.PollWithContext(ctx, 100*time.Millisecond, 30*time.Second, func(ctx context.Context) (bool, error) {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
return false, err
|
return false, err
|
||||||
default:
|
default:
|
||||||
|
@ -29,9 +29,37 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
|
|||||||
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
|
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
|
||||||
# A few files involved in startup migrated already to contextual
|
# A few files involved in startup migrated already to contextual
|
||||||
# We can't enable contextual logcheck until all are migrated
|
# We can't enable contextual logcheck until all are migrated
|
||||||
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
|
||||||
contextual k8s.io/dynamic-resource-allocation/.*
|
contextual k8s.io/dynamic-resource-allocation/.*
|
||||||
|
contextual k8s.io/kubernetes/cmd/kube-controller-manager/.*
|
||||||
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
|
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
|
||||||
|
contextual k8s.io/kubernetes/pkg/controller/.*
|
||||||
|
contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||||
|
|
||||||
|
# Most of kube-controller-manager has been converted, but not everything. At
|
||||||
|
# this point it is easier to list the exceptions.
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/certificates/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/deployment/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/job/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/nodeipam/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/podgc/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/replicaset/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/statefulset/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/testutil/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/util/.*
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing/testvolumespec.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/expand/expand_controller.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_test.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/persistentvolume/volume_host.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go
|
||||||
|
-contextual k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller_test.go
|
||||||
|
|
||||||
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
# As long as contextual logging is alpha or beta, all WithName, WithValues,
|
||||||
# NewContext calls have to go through klog. Once it is GA, we can lift
|
# NewContext calls have to go through klog. Once it is GA, we can lift
|
||||||
|
@ -1274,7 +1274,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
|
|||||||
case err != nil && statusErr != nil:
|
case err != nil && statusErr != nil:
|
||||||
// If there was an error, and we failed to update status,
|
// If there was an error, and we failed to update status,
|
||||||
// log it and return the original error.
|
// log it and return the original error.
|
||||||
klog.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
|
logger.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
|
||||||
return err
|
return err
|
||||||
case err != nil:
|
case err != nil:
|
||||||
return err
|
return err
|
||||||
|
@ -155,8 +155,7 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
|
|||||||
logger.Info("Starting controller", "controller", "garbagecollector")
|
logger.Info("Starting controller", "controller", "garbagecollector")
|
||||||
defer logger.Info("Shutting down controller", "controller", "garbagecollector")
|
defer logger.Info("Shutting down controller", "controller", "garbagecollector")
|
||||||
|
|
||||||
graphLogger := klog.LoggerWithName(logger, "graphbuilder")
|
go gc.dependencyGraphBuilder.Run(ctx)
|
||||||
go gc.dependencyGraphBuilder.Run(klog.NewContext(ctx, graphLogger))
|
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
|
if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
|
||||||
return gc.dependencyGraphBuilder.IsSynced(logger)
|
return gc.dependencyGraphBuilder.IsSynced(logger)
|
||||||
|
@ -307,7 +307,7 @@ func NewNodeLifecycleController(
|
|||||||
largeClusterThreshold int32,
|
largeClusterThreshold int32,
|
||||||
unhealthyZoneThreshold float32,
|
unhealthyZoneThreshold float32,
|
||||||
) (*Controller, error) {
|
) (*Controller, error) {
|
||||||
logger := klog.LoggerWithName(klog.FromContext(ctx), "NodeLifecycleController")
|
logger := klog.FromContext(ctx)
|
||||||
if kubeClient == nil {
|
if kubeClient == nil {
|
||||||
logger.Error(nil, "kubeClient is nil when starting nodelifecycle Controller")
|
logger.Error(nil, "kubeClient is nil when starting nodelifecycle Controller")
|
||||||
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
|
||||||
|
@ -227,8 +227,9 @@ func (ec *Controller) Run(ctx context.Context, workers int) {
|
|||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
defer ec.queue.ShutDown()
|
defer ec.queue.ShutDown()
|
||||||
|
|
||||||
klog.Infof("Starting ephemeral volume controller")
|
logger := klog.FromContext(ctx)
|
||||||
defer klog.Infof("Shutting down ephemeral volume controller")
|
logger.Info("Starting ephemeral volume controller")
|
||||||
|
defer logger.Info("Shutting down ephemeral volume controller")
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(klog.Infof)
|
eventBroadcaster.StartLogging(klog.Infof)
|
||||||
|
@ -166,13 +166,14 @@ func (e *TokensController) Run(ctx context.Context, workers int) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.FromContext(ctx).V(5).Info("Starting workers")
|
logger := klog.FromContext(ctx)
|
||||||
|
logger.V(5).Info("Starting workers")
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go wait.Until(e.syncServiceAccount, 0, ctx.Done())
|
go wait.UntilWithContext(ctx, e.syncServiceAccount, 0)
|
||||||
go wait.Until(e.syncSecret, 0, ctx.Done())
|
go wait.UntilWithContext(ctx, e.syncSecret, 0)
|
||||||
}
|
}
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
klog.FromContext(ctx).V(1).Info("Shutting down")
|
logger.V(1).Info("Shutting down")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TokensController) queueServiceAccountSync(obj interface{}) {
|
func (e *TokensController) queueServiceAccountSync(obj interface{}) {
|
||||||
@ -188,7 +189,7 @@ func (e *TokensController) queueServiceAccountUpdateSync(oldObj interface{}, new
|
|||||||
}
|
}
|
||||||
|
|
||||||
// complete optionally requeues key, then calls queue.Done(key)
|
// complete optionally requeues key, then calls queue.Done(key)
|
||||||
func (e *TokensController) retryOrForget(queue workqueue.RateLimitingInterface, key interface{}, requeue bool) {
|
func (e *TokensController) retryOrForget(logger klog.Logger, queue workqueue.RateLimitingInterface, key interface{}, requeue bool) {
|
||||||
if !requeue {
|
if !requeue {
|
||||||
queue.Forget(key)
|
queue.Forget(key)
|
||||||
return
|
return
|
||||||
@ -200,7 +201,7 @@ func (e *TokensController) retryOrForget(queue workqueue.RateLimitingInterface,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("retried %d times: %#v", requeueCount, key)
|
logger.V(4).Info("retried several times", "key", key, "count", requeueCount)
|
||||||
queue.Forget(key)
|
queue.Forget(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,8 +217,8 @@ func (e *TokensController) queueSecretUpdateSync(oldObj interface{}, newObj inte
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TokensController) syncServiceAccount() {
|
func (e *TokensController) syncServiceAccount(ctx context.Context) {
|
||||||
logger := klog.FromContext(context.TODO())
|
logger := klog.FromContext(ctx)
|
||||||
key, quit := e.syncServiceAccountQueue.Get()
|
key, quit := e.syncServiceAccountQueue.Get()
|
||||||
if quit {
|
if quit {
|
||||||
return
|
return
|
||||||
@ -226,7 +227,7 @@ func (e *TokensController) syncServiceAccount() {
|
|||||||
|
|
||||||
retry := false
|
retry := false
|
||||||
defer func() {
|
defer func() {
|
||||||
e.retryOrForget(e.syncServiceAccountQueue, key, retry)
|
e.retryOrForget(logger, e.syncServiceAccountQueue, key, retry)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
saInfo, err := parseServiceAccountKey(key)
|
saInfo, err := parseServiceAccountKey(key)
|
||||||
@ -251,20 +252,20 @@ func (e *TokensController) syncServiceAccount() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TokensController) syncSecret() {
|
func (e *TokensController) syncSecret(ctx context.Context) {
|
||||||
key, quit := e.syncSecretQueue.Get()
|
key, quit := e.syncSecretQueue.Get()
|
||||||
if quit {
|
if quit {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer e.syncSecretQueue.Done(key)
|
defer e.syncSecretQueue.Done(key)
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
// Track whether or not we should retry this sync
|
// Track whether or not we should retry this sync
|
||||||
retry := false
|
retry := false
|
||||||
defer func() {
|
defer func() {
|
||||||
e.retryOrForget(e.syncSecretQueue, key, retry)
|
e.retryOrForget(logger, e.syncSecretQueue, key, retry)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
logger := klog.FromContext(context.TODO())
|
|
||||||
secretInfo, err := parseSecretQueueKey(key)
|
secretInfo, err := parseSecretQueueKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(err, "Parsing secret queue key")
|
logger.Error(err, "Parsing secret queue key")
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/test/utils/ktesting"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testGenerator struct {
|
type testGenerator struct {
|
||||||
@ -438,6 +439,8 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
for k, tc := range testcases {
|
for k, tc := range testcases {
|
||||||
t.Run(k, func(t *testing.T) {
|
t.Run(k, func(t *testing.T) {
|
||||||
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
|
||||||
// Re-seed to reset name generation
|
// Re-seed to reset name generation
|
||||||
utilrand.Seed(1)
|
utilrand.Seed(1)
|
||||||
|
|
||||||
@ -497,10 +500,10 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
if controller.syncServiceAccountQueue.Len() > 0 {
|
if controller.syncServiceAccountQueue.Len() > 0 {
|
||||||
controller.syncServiceAccount()
|
controller.syncServiceAccount(ctx)
|
||||||
}
|
}
|
||||||
if controller.syncSecretQueue.Len() > 0 {
|
if controller.syncSecretQueue.Len() > 0 {
|
||||||
controller.syncSecret()
|
controller.syncSecret(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The queues still have things to work on
|
// The queues still have things to work on
|
||||||
|
Loading…
Reference in New Issue
Block a user