extract electAndRun to a top-level func.

This commit is contained in:
Indeed 2021-03-09 10:22:05 -08:00
parent 721b1822d6
commit 3362918f8f
2 changed files with 67 additions and 60 deletions

View File

@ -243,32 +243,6 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// add a uniquifier so that two processes on the same host don't accidentally both become active // add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID()) id = id + "_" + string(uuid.NewUUID())
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
leaderElectAndRun := func(resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
})
}
// If leader migration is enabled, use the redirected initialization // If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not // Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled. // affect the result if the feature is not enabled.
@ -289,7 +263,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
} }
// Start the main lock, using LeaderElectionConfiguration for the type and name of the lease. // Start the main lock, using LeaderElectionConfiguration for the type and name of the lease.
go leaderElectAndRun( go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName, c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{ leaderelection.LeaderCallbacks{
@ -310,7 +284,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
<-saTokenControllerStarted <-saTokenControllerStarted
// Start the migration lock, using LeaderMigrationConfiguration for the type and name of the lease. // Start the migration lock, using LeaderMigrationConfiguration for the type and name of the lease.
leaderElectAndRun( leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderMigration.ResourceLock, c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
c.ComponentConfig.Generic.LeaderMigration.LeaderName, c.ComponentConfig.Generic.LeaderMigration.LeaderName,
leaderelection.LeaderCallbacks{ leaderelection.LeaderCallbacks{
@ -327,7 +301,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
} // end of leader migration } // end of leader migration
// Normal leader election, without leader migration // Normal leader election, without leader migration
leaderElectAndRun( leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName, c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{ leaderelection.LeaderCallbacks{
@ -690,6 +664,35 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde
return return
} }
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
// TODO: extract this function into staging/controller-manager
func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: lockIdentity,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
})
panic("unreachable")
}
// filterInitializers returns initializers that has filterFunc(name) == true. // filterInitializers returns initializers that has filterFunc(name) == true.
// filterFunc can be nil, in which case the original initializers will be returned directly. // filterFunc can be nil, in which case the original initializers will be returned directly.
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc { func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc {

View File

@ -200,34 +200,6 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
// add a uniquifier so that two processes on the same host don't accidentally both become active // add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID()) id = id + "_" + string(uuid.NewUUID())
// leaderElectAndRun runs the callbacks once the leader lease is acquired.
leaderElectAndRun := func(resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
// Lock required for leader election
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
})
}
// If leader migration is enabled, use the redirected initialization // If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not // Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled. // affect the result if the feature is not enabled.
@ -245,7 +217,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
mainLockAcquired := make(chan struct{}) mainLockAcquired := make(chan struct{})
// Start the main lock. // Start the main lock.
go leaderElectAndRun(c.ComponentConfig.Generic.LeaderElection.ResourceLock, go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName, c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{ leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) { OnStartedLeading: func(ctx context.Context) {
@ -263,7 +236,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
<-mainLockAcquired <-mainLockAcquired
// Start the migration lock. // Start the migration lock.
leaderElectAndRun(c.ComponentConfig.Generic.LeaderMigration.ResourceLock, leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
c.ComponentConfig.Generic.LeaderMigration.LeaderName, c.ComponentConfig.Generic.LeaderMigration.LeaderName,
leaderelection.LeaderCallbacks{ leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) { OnStartedLeading: func(ctx context.Context) {
@ -279,7 +253,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
} // end of leader migration } // end of leader migration
// Normal leader election, without leader migration // Normal leader election, without leader migration
leaderElectAndRun(c.ComponentConfig.Generic.LeaderElection.ResourceLock, leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName, c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{ leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) { OnStartedLeading: func(ctx context.Context) {
@ -480,6 +455,35 @@ func ResyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration
} }
} }
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
// TODO: extract this function into staging/controller-manager
func leaderElectAndRun(c *cloudcontrollerconfig.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: lockIdentity,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
})
panic("unreachable")
}
// devideInitializers applies filterFunc to each of the given intializiers. // devideInitializers applies filterFunc to each of the given intializiers.
// filtered contains all controllers that have filterFunc(name) == true // filtered contains all controllers that have filterFunc(name) == true
// rest contains all controllers that have filterFunc(name) == false // rest contains all controllers that have filterFunc(name) == false