From 3362918f8f12f93c7316cb8ecd464c5325f0c6a7 Mon Sep 17 00:00:00 2001 From: Indeed Date: Tue, 9 Mar 2021 10:22:05 -0800 Subject: [PATCH] extract electAndRun to a top-level func. --- .../app/controllermanager.go | 61 +++++++++-------- .../cloud-provider/app/controllermanager.go | 66 ++++++++++--------- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 22ab003ade3..3f0d248ecb4 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -243,32 +243,6 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // add a uniquifier so that two processes on the same host don't accidentally both become active id = id + "_" + string(uuid.NewUUID()) - // leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired. - leaderElectAndRun := func(resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) { - rl, err := resourcelock.NewFromKubeconfig(resourceLock, - c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, - leaseName, - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: c.EventRecorder, - }, - c.Kubeconfig, - c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration) - if err != nil { - klog.Fatalf("error creating lock: %v", err) - } - - leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, - RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, - RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, - Callbacks: callbacks, - WatchDog: electionChecker, - Name: leaseName, - }) - } - // If leader migration is enabled, use the redirected initialization // Check feature gate and configuration separately so that any error in configuration checking will not // affect the result if the feature is not enabled. @@ -289,7 +263,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { } // Start the main lock, using LeaderElectionConfiguration for the type and name of the lease. - go leaderElectAndRun( + go leaderElectAndRun(c, id, electionChecker, c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceName, leaderelection.LeaderCallbacks{ @@ -310,7 +284,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { <-saTokenControllerStarted // Start the migration lock, using LeaderMigrationConfiguration for the type and name of the lease. - leaderElectAndRun( + leaderElectAndRun(c, id, electionChecker, c.ComponentConfig.Generic.LeaderMigration.ResourceLock, c.ComponentConfig.Generic.LeaderMigration.LeaderName, leaderelection.LeaderCallbacks{ @@ -327,7 +301,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { } // end of leader migration // Normal leader election, without leader migration - leaderElectAndRun( + leaderElectAndRun(c, id, electionChecker, c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceName, leaderelection.LeaderCallbacks{ @@ -690,6 +664,35 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde return } +// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired. +// TODO: extract this function into staging/controller-manager +func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) { + rl, err := resourcelock.NewFromKubeconfig(resourceLock, + c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, + leaseName, + resourcelock.ResourceLockConfig{ + Identity: lockIdentity, + EventRecorder: c.EventRecorder, + }, + c.Kubeconfig, + c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration) + if err != nil { + klog.Fatalf("error creating lock: %v", err) + } + + leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, + RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, + RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, + Callbacks: callbacks, + WatchDog: electionChecker, + Name: leaseName, + }) + + panic("unreachable") +} + // filterInitializers returns initializers that has filterFunc(name) == true. // filterFunc can be nil, in which case the original initializers will be returned directly. func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc { diff --git a/staging/src/k8s.io/cloud-provider/app/controllermanager.go b/staging/src/k8s.io/cloud-provider/app/controllermanager.go index 599367843ae..9d72a76ee21 100644 --- a/staging/src/k8s.io/cloud-provider/app/controllermanager.go +++ b/staging/src/k8s.io/cloud-provider/app/controllermanager.go @@ -200,34 +200,6 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface // add a uniquifier so that two processes on the same host don't accidentally both become active id = id + "_" + string(uuid.NewUUID()) - // leaderElectAndRun runs the callbacks once the leader lease is acquired. - leaderElectAndRun := func(resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) { - // Lock required for leader election - rl, err := resourcelock.NewFromKubeconfig(resourceLock, - c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, - leaseName, - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: c.EventRecorder, - }, - c.Kubeconfig, - c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration) - if err != nil { - klog.Fatalf("error creating lock: %v", err) - } - - // Try and become the leader and start cloud controller manager loops - leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, - RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, - RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, - Callbacks: callbacks, - WatchDog: electionChecker, - Name: leaseName, - }) - } - // If leader migration is enabled, use the redirected initialization // Check feature gate and configuration separately so that any error in configuration checking will not // affect the result if the feature is not enabled. @@ -245,7 +217,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface mainLockAcquired := make(chan struct{}) // Start the main lock. - go leaderElectAndRun(c.ComponentConfig.Generic.LeaderElection.ResourceLock, + go leaderElectAndRun(c, id, electionChecker, + c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceName, leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { @@ -263,7 +236,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface <-mainLockAcquired // Start the migration lock. - leaderElectAndRun(c.ComponentConfig.Generic.LeaderMigration.ResourceLock, + leaderElectAndRun(c, id, electionChecker, + c.ComponentConfig.Generic.LeaderMigration.ResourceLock, c.ComponentConfig.Generic.LeaderMigration.LeaderName, leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { @@ -279,7 +253,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface } // end of leader migration // Normal leader election, without leader migration - leaderElectAndRun(c.ComponentConfig.Generic.LeaderElection.ResourceLock, + leaderElectAndRun(c, id, electionChecker, + c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceName, leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { @@ -480,6 +455,35 @@ func ResyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration } } +// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired. +// TODO: extract this function into staging/controller-manager +func leaderElectAndRun(c *cloudcontrollerconfig.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) { + rl, err := resourcelock.NewFromKubeconfig(resourceLock, + c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, + leaseName, + resourcelock.ResourceLockConfig{ + Identity: lockIdentity, + EventRecorder: c.EventRecorder, + }, + c.Kubeconfig, + c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration) + if err != nil { + klog.Fatalf("error creating lock: %v", err) + } + + leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, + RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, + RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, + Callbacks: callbacks, + WatchDog: electionChecker, + Name: leaseName, + }) + + panic("unreachable") +} + // devideInitializers applies filterFunc to each of the given intializiers. // filtered contains all controllers that have filterFunc(name) == true // rest contains all controllers that have filterFunc(name) == false