extract common code for the main lock.

This commit is contained in:
Indeed 2021-03-09 11:17:49 -08:00
parent 3362918f8f
commit e8479414ab
4 changed files with 90 additions and 90 deletions

View File

@ -243,48 +243,57 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled.
if leadermigration.FeatureEnabled() && leadermigration.Enabled(&c.ComponentConfig.Generic) {
// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
var leaderMigrator *leadermigration.LeaderMigrator = nil
// startSATokenController will be original saTokenControllerInitFunc if leader migration is not enabled.
startSATokenController := saTokenControllerInitFunc
// If leader migration is enabled, create the LeaderMigrator and prepare for migration
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
klog.Infof("starting leader migration")
leaderMigrator := leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"kube-controller-manager")
// We need a channel to signal the start of Service Account Token Controller
// all migrated channel must start afterwards.
saTokenControllerStarted := make(chan struct{})
// Wrap saTokenControllerInitFunc to close the channel after returning.
wrappedSATokenControllerInitFunc := func(ctx ControllerContext) (http.Handler, bool, error) {
defer close(saTokenControllerStarted)
// Wrap saTokenControllerInitFunc to signal readiness for migration after starting
// the controller.
startSATokenController = func(ctx ControllerContext) (http.Handler, bool, error) {
defer close(leaderMigrator.MigrationReady)
return saTokenControllerInitFunc(ctx)
}
}
// Start the main lock, using LeaderElectionConfiguration for the type and name of the lease.
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// Start the main lock
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
var filterFunc leadermigration.FilterFunc = nil
if leaderMigrator != nil {
// If leader migration is enabled, we should start only non-migrated controllers
// for the main lock.
filterFunc = leaderMigrator.FilterFunc(false)
klog.Info("leader migration: starting main controllers.")
// saTokenController should only run under the main lock
run(ctx, wrappedSATokenControllerInitFunc, leaderMigrator.FilterFunc(false))
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
}
run(ctx, startSATokenController, filterFunc)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
// If Leader Migration is enabled, proceed to attempt the migration lock.
if leaderMigrator != nil {
// Wait for Service Account Token Controller to start before acquiring the migration lock.
// At this point, the main lock must have already been acquired, or the KCM process already exited.
// We wait for the main lock before acquiring the migration lock to prevent the situation
// where KCM instance A holds the main lock while KCM instance B holds the migration lock.
<-saTokenControllerStarted
<-leaderMigrator.MigrationReady
// Start the migration lock, using LeaderMigrationConfiguration for the type and name of the lease.
leaderElectAndRun(c, id, electionChecker,
// Start the migration lock.
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
leaderelection.LeaderCallbacks{
@ -297,23 +306,9 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
klog.Fatalf("migration leaderelection lost")
},
})
panic("unreachable")
} // end of leader migration
}
// Normal leader election, without leader migration
leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(ctx, saTokenControllerInitFunc, nil)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
panic("unreachable")
select {}
}
// ControllerContext defines the context object for controller
@ -695,6 +690,7 @@ func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionC
// filterInitializers returns initializers that has filterFunc(name) == true.
// filterFunc can be nil, in which case the original initializers will be returned directly.
// InitFunc is local to kube-controller-manager, and thus filterInitializers has to be local too.
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc {
if filterFunc == nil {
return allInitializers

View File

@ -200,43 +200,55 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
var leaderMigrator *leadermigration.LeaderMigrator = nil
// Leader migration requires splitting initializers for the main and the migration lock
// If leader migration is not enabled, these two will be unused.
var migratedInitializers, unmigratedInitializers map[string]InitFunc
// If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled.
if leadermigration.FeatureEnabled() && leadermigration.Enabled(&c.ComponentConfig.Generic) {
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
klog.Info("starting leader migration")
leaderMigrator := leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"cloud-controller-manager")
// Split initializers based on which lease they require, main lease or migration lease.
migratedInitializers, unmigratedInitializers := devideInitializers(controllerInitializers, leaderMigrator.FilterFunc(true))
migratedInitializers, unmigratedInitializers = divideInitializers(controllerInitializers, leaderMigrator.FilterFunc(true))
}
// We need to signal acquiring the main lock before attempting to acquire the migration lock
// so that no instance will hold the migration lock but not the main lock at any point.
mainLockAcquired := make(chan struct{})
// Start the main lock.
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// Start the main lock
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
initializers := controllerInitializers
if leaderMigrator != nil {
// If leader migration is enabled, we should start only non-migrated controllers
// for the main lock.
initializers = unmigratedInitializers
klog.Info("leader migration: starting main controllers.")
// signal acquiring the main lock
close(mainLockAcquired)
run(ctx, unmigratedInitializers)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
// Signal the main lock is acquired, and thus migration lock is ready to attempt.
close(leaderMigrator.MigrationReady)
}
run(ctx, initializers)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
// Wait for the main lease to be acquired before attempting the migration lease.
<-mainLockAcquired
// If Leader Migration is enabled, proceed to attempt the migration lock.
if leaderMigrator != nil {
// Wait for the signal of main lock being acquired.
<-leaderMigrator.MigrationReady
// Start the migration lock.
leaderElectAndRun(c, id, electionChecker,
go leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
leaderelection.LeaderCallbacks{
@ -248,24 +260,9 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
klog.Fatalf("migration leaderelection lost")
},
})
}
panic("unreachable")
} // end of leader migration
// Normal leader election, without leader migration
leaderElectAndRun(c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(ctx, controllerInitializers)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
panic("unreachable")
select {}
}
// startControllers starts the cloud specific controller loops.
@ -484,10 +481,11 @@ func leaderElectAndRun(c *cloudcontrollerconfig.CompletedConfig, lockIdentity st
panic("unreachable")
}
// devideInitializers applies filterFunc to each of the given intializiers.
// filtered contains all controllers that have filterFunc(name) == true
// rest contains all controllers that have filterFunc(name) == false
func devideInitializers(allInitializers map[string]InitFunc,
// divideInitializers applies filterFunc to each of the given intializiers.
// filtered contains all controllers that have filterFunc(name) == true
// rest contains all controllers that have filterFunc(name) == false
// InitFunc is local to cloud-controller-manager, and thus divideInitializers has to be local too.
func divideInitializers(allInitializers map[string]InitFunc,
filterFunc leadermigration.FilterFunc) (filtered map[string]InitFunc, rest map[string]InitFunc) {
if filterFunc == nil {
return make(map[string]InitFunc), allInitializers

View File

@ -22,6 +22,10 @@ import (
// LeaderMigrator holds information required by the leader migration process.
type LeaderMigrator struct {
// MigrationReady is closed after the coontroller manager finishes preparing for the migration lock.
// After this point, the leader migration process will proceed to acquire the migration lock.
MigrationReady chan struct{}
config *internal.LeaderMigrationConfiguration
migratedControllers map[string]bool
// component indicates the name of the control-plane component that uses leader migration,
@ -41,6 +45,7 @@ func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component
migratedControllers[leader.Name] = leader.Component == component
}
return &LeaderMigrator{
MigrationReady: make(chan struct{}),
config: config,
migratedControllers: migratedControllers,
component: component,

View File

@ -19,6 +19,7 @@ package leadermigration
import config "k8s.io/controller-manager/config"
// Enabled checks whether Leader Migration should be enabled, given the GenericControllerManagerConfiguration.
// It considers the feature gate first, and will always return false if the feature gate is not enabled.
func Enabled(genericConfig *config.GenericControllerManagerConfiguration) bool {
return genericConfig.LeaderElection.LeaderElect && genericConfig.LeaderMigrationEnabled
return FeatureEnabled() && genericConfig.LeaderElection.LeaderElect && genericConfig.LeaderMigrationEnabled
}