mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #96541 from jiahuif/feature/leader-migration/initial-implementation
(KEP-2436) implementation of leader migration for controller manager.
This commit is contained in:
commit
2195718940
@ -61,6 +61,7 @@ import (
|
|||||||
genericcontrollermanager "k8s.io/controller-manager/app"
|
genericcontrollermanager "k8s.io/controller-manager/app"
|
||||||
"k8s.io/controller-manager/pkg/clientbuilder"
|
"k8s.io/controller-manager/pkg/clientbuilder"
|
||||||
"k8s.io/controller-manager/pkg/informerfactory"
|
"k8s.io/controller-manager/pkg/informerfactory"
|
||||||
|
"k8s.io/controller-manager/pkg/leadermigration"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
|
"k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
|
||||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
|
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
|
||||||
@ -207,32 +208,18 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
run := func(ctx context.Context) {
|
clientBuilder, rootClientBuilder := createClientBuilders(c)
|
||||||
rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{
|
|
||||||
ClientConfig: c.Kubeconfig,
|
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
|
||||||
}
|
|
||||||
var clientBuilder clientbuilder.ControllerClientBuilder
|
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
|
||||||
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
|
|
||||||
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
|
||||||
// It's possible another controller process is creating the tokens for us.
|
|
||||||
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
|
|
||||||
klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
|
|
||||||
}
|
|
||||||
|
|
||||||
clientBuilder = clientbuilder.NewDynamicClientBuilder(
|
|
||||||
restclient.AnonymousClientConfig(c.Kubeconfig),
|
|
||||||
c.Client.CoreV1(),
|
|
||||||
metav1.NamespaceSystem)
|
|
||||||
} else {
|
|
||||||
clientBuilder = rootClientBuilder
|
|
||||||
}
|
|
||||||
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
|
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("error building controller context: %v", err)
|
klog.Fatalf("error building controller context: %v", err)
|
||||||
}
|
}
|
||||||
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
|
controllerInitializers := initializersFunc(controllerContext.LoopMode)
|
||||||
|
if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil {
|
||||||
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
|
|
||||||
klog.Fatalf("error starting controllers: %v", err)
|
klog.Fatalf("error starting controllers: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,8 +230,9 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No leader election, run directly
|
||||||
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
||||||
run(context.TODO())
|
run(context.TODO(), saTokenControllerInitFunc, NewControllerInitializers)
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -256,34 +244,72 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
|||||||
// add a uniquifier so that two processes on the same host don't accidentally both become active
|
// add a uniquifier so that two processes on the same host don't accidentally both become active
|
||||||
id = id + "_" + string(uuid.NewUUID())
|
id = id + "_" + string(uuid.NewUUID())
|
||||||
|
|
||||||
rl, err := resourcelock.NewFromKubeconfig(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
|
// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
|
var leaderMigrator *leadermigration.LeaderMigrator = nil
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
|
||||||
resourcelock.ResourceLockConfig{
|
// startSATokenController will be original saTokenControllerInitFunc if leader migration is not enabled.
|
||||||
Identity: id,
|
startSATokenController := saTokenControllerInitFunc
|
||||||
EventRecorder: c.EventRecorder,
|
|
||||||
},
|
// If leader migration is enabled, create the LeaderMigrator and prepare for migration
|
||||||
c.Kubeconfig,
|
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
|
||||||
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
klog.Infof("starting leader migration")
|
||||||
if err != nil {
|
|
||||||
klog.Fatalf("error creating lock: %v", err)
|
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
|
||||||
|
"kube-controller-manager")
|
||||||
|
|
||||||
|
// Wrap saTokenControllerInitFunc to signal readiness for migration after starting
|
||||||
|
// the controller.
|
||||||
|
startSATokenController = func(ctx ControllerContext) (http.Handler, bool, error) {
|
||||||
|
defer close(leaderMigrator.MigrationReady)
|
||||||
|
return saTokenControllerInitFunc(ctx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
|
// Start the main lock
|
||||||
Lock: rl,
|
go leaderElectAndRun(c, id, electionChecker,
|
||||||
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
|
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
|
||||||
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
|
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
||||||
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
|
leaderelection.LeaderCallbacks{
|
||||||
Callbacks: leaderelection.LeaderCallbacks{
|
OnStartedLeading: func(ctx context.Context) {
|
||||||
OnStartedLeading: run,
|
initializersFunc := NewControllerInitializers
|
||||||
|
if leaderMigrator != nil {
|
||||||
|
// If leader migration is enabled, we should start only non-migrated controllers
|
||||||
|
// for the main lock.
|
||||||
|
initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
|
||||||
|
klog.Info("leader migration: starting main controllers.")
|
||||||
|
}
|
||||||
|
run(ctx, startSATokenController, initializersFunc)
|
||||||
|
},
|
||||||
OnStoppedLeading: func() {
|
OnStoppedLeading: func() {
|
||||||
klog.Fatalf("leaderelection lost")
|
klog.Fatalf("leaderelection lost")
|
||||||
},
|
},
|
||||||
},
|
|
||||||
WatchDog: electionChecker,
|
|
||||||
Name: "kube-controller-manager",
|
|
||||||
})
|
})
|
||||||
panic("unreachable")
|
|
||||||
|
// If Leader Migration is enabled, proceed to attempt the migration lock.
|
||||||
|
if leaderMigrator != nil {
|
||||||
|
// Wait for Service Account Token Controller to start before acquiring the migration lock.
|
||||||
|
// At this point, the main lock must have already been acquired, or the KCM process already exited.
|
||||||
|
// We wait for the main lock before acquiring the migration lock to prevent the situation
|
||||||
|
// where KCM instance A holds the main lock while KCM instance B holds the migration lock.
|
||||||
|
<-leaderMigrator.MigrationReady
|
||||||
|
|
||||||
|
// Start the migration lock.
|
||||||
|
go leaderElectAndRun(c, id, electionChecker,
|
||||||
|
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
|
||||||
|
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
|
||||||
|
leaderelection.LeaderCallbacks{
|
||||||
|
OnStartedLeading: func(ctx context.Context) {
|
||||||
|
klog.Info("leader migration: starting migrated controllers.")
|
||||||
|
// DO NOT start saTokenController under migration lock
|
||||||
|
run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
|
||||||
|
},
|
||||||
|
OnStoppedLeading: func() {
|
||||||
|
klog.Fatalf("migration leaderelection lost")
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ControllerContext defines the context object for controller
|
// ControllerContext defines the context object for controller
|
||||||
@ -343,6 +369,12 @@ func (c ControllerContext) IsControllerEnabled(name string) bool {
|
|||||||
// The bool indicates whether the controller was enabled.
|
// The bool indicates whether the controller was enabled.
|
||||||
type InitFunc func(ctx ControllerContext) (debuggingHandler http.Handler, enabled bool, err error)
|
type InitFunc func(ctx ControllerContext) (debuggingHandler http.Handler, enabled bool, err error)
|
||||||
|
|
||||||
|
// ControllerInitializersFunc is used to create a collection of initializers
|
||||||
|
// given the loopMode.
|
||||||
|
type ControllerInitializersFunc func(loopMode ControllerLoopMode) (initializers map[string]InitFunc)
|
||||||
|
|
||||||
|
var _ ControllerInitializersFunc = NewControllerInitializers
|
||||||
|
|
||||||
// KnownControllers returns all known controllers's name
|
// KnownControllers returns all known controllers's name
|
||||||
func KnownControllers() []string {
|
func KnownControllers() []string {
|
||||||
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
|
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
|
||||||
@ -504,9 +536,11 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
|
|||||||
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
|
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
|
||||||
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
||||||
// If this fails, just return here and fail since other controllers won't be able to get credentials.
|
// If this fails, just return here and fail since other controllers won't be able to get credentials.
|
||||||
|
if startSATokenController != nil {
|
||||||
if _, _, err := startSATokenController(ctx); err != nil {
|
if _, _, err := startSATokenController(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize the cloud provider with a reference to the clientBuilder only after token controller
|
// Initialize the cloud provider with a reference to the clientBuilder only after token controller
|
||||||
// has started in case the cloud provider uses the client builder.
|
// has started in case the cloud provider uses the client builder.
|
||||||
@ -609,3 +643,68 @@ func readCA(file string) ([]byte, error) {
|
|||||||
|
|
||||||
return rootCA, err
|
return rootCA, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createClientBuilders creates clientBuilder and rootClientBuilder from the given configuration
|
||||||
|
func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilder.ControllerClientBuilder, rootClientBuilder clientbuilder.ControllerClientBuilder) {
|
||||||
|
rootClientBuilder = clientbuilder.SimpleControllerClientBuilder{
|
||||||
|
ClientConfig: c.Kubeconfig,
|
||||||
|
}
|
||||||
|
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
|
||||||
|
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
|
||||||
|
// It's possible another controller process is creating the tokens for us.
|
||||||
|
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
|
||||||
|
klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
|
||||||
|
}
|
||||||
|
|
||||||
|
clientBuilder = clientbuilder.NewDynamicClientBuilder(
|
||||||
|
restclient.AnonymousClientConfig(c.Kubeconfig),
|
||||||
|
c.Client.CoreV1(),
|
||||||
|
metav1.NamespaceSystem)
|
||||||
|
} else {
|
||||||
|
clientBuilder = rootClientBuilder
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
|
||||||
|
// TODO: extract this function into staging/controller-manager
|
||||||
|
func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
|
||||||
|
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
|
||||||
|
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
|
||||||
|
leaseName,
|
||||||
|
resourcelock.ResourceLockConfig{
|
||||||
|
Identity: lockIdentity,
|
||||||
|
EventRecorder: c.EventRecorder,
|
||||||
|
},
|
||||||
|
c.Kubeconfig,
|
||||||
|
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("error creating lock: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
|
||||||
|
Lock: rl,
|
||||||
|
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
|
||||||
|
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
|
||||||
|
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
|
||||||
|
Callbacks: callbacks,
|
||||||
|
WatchDog: electionChecker,
|
||||||
|
Name: leaseName,
|
||||||
|
})
|
||||||
|
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
|
||||||
|
// createInitializersFunc creates a initializersFunc that returns all initializer
|
||||||
|
// with expected as the result after filtering through filterFunc.
|
||||||
|
func createInitializersFunc(filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) ControllerInitializersFunc {
|
||||||
|
return func(loopMode ControllerLoopMode) map[string]InitFunc {
|
||||||
|
initializers := make(map[string]InitFunc)
|
||||||
|
for name, initializer := range NewControllerInitializers(loopMode) {
|
||||||
|
if filterFunc(name) == expected {
|
||||||
|
initializers[name] = initializer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return initializers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"k8s.io/component-base/metrics"
|
"k8s.io/component-base/metrics"
|
||||||
cmconfig "k8s.io/controller-manager/config"
|
cmconfig "k8s.io/controller-manager/config"
|
||||||
cmoptions "k8s.io/controller-manager/options"
|
cmoptions "k8s.io/controller-manager/options"
|
||||||
|
migration "k8s.io/controller-manager/pkg/leadermigration/options"
|
||||||
kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
|
kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
|
||||||
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
|
kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
|
||||||
csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config"
|
csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config"
|
||||||
@ -197,6 +198,7 @@ func TestAddFlags(t *testing.T) {
|
|||||||
EnableContentionProfiling: true,
|
EnableContentionProfiling: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
LeaderMigration: &migration.LeaderMigrationOptions{},
|
||||||
},
|
},
|
||||||
KubeCloudShared: &cpoptions.KubeCloudSharedOptions{
|
KubeCloudShared: &cpoptions.KubeCloudSharedOptions{
|
||||||
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
|
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
|
||||||
|
@ -52,6 +52,7 @@ import (
|
|||||||
genericcontrollermanager "k8s.io/controller-manager/app"
|
genericcontrollermanager "k8s.io/controller-manager/app"
|
||||||
"k8s.io/controller-manager/pkg/clientbuilder"
|
"k8s.io/controller-manager/pkg/clientbuilder"
|
||||||
"k8s.io/controller-manager/pkg/informerfactory"
|
"k8s.io/controller-manager/pkg/informerfactory"
|
||||||
|
"k8s.io/controller-manager/pkg/leadermigration"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -173,7 +174,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
run := func(ctx context.Context) {
|
run := func(ctx context.Context, controllerInitializers map[string]InitFunc) {
|
||||||
clientBuilder := clientbuilder.SimpleControllerClientBuilder{
|
clientBuilder := clientbuilder.SimpleControllerClientBuilder{
|
||||||
ClientConfig: c.Kubeconfig,
|
ClientConfig: c.Kubeconfig,
|
||||||
}
|
}
|
||||||
@ -187,7 +188,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
||||||
run(context.TODO())
|
run(context.TODO(), controllerInitializers)
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,36 +200,62 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
|
|||||||
// add a uniquifier so that two processes on the same host don't accidentally both become active
|
// add a uniquifier so that two processes on the same host don't accidentally both become active
|
||||||
id = id + "_" + string(uuid.NewUUID())
|
id = id + "_" + string(uuid.NewUUID())
|
||||||
|
|
||||||
// Lock required for leader election
|
// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
|
||||||
rl, err := resourcelock.NewFromKubeconfig(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
|
var leaderMigrator *leadermigration.LeaderMigrator = nil
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
|
|
||||||
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
// If leader migration is enabled, use the redirected initialization
|
||||||
resourcelock.ResourceLockConfig{
|
// Check feature gate and configuration separately so that any error in configuration checking will not
|
||||||
Identity: id,
|
// affect the result if the feature is not enabled.
|
||||||
EventRecorder: c.EventRecorder,
|
if leadermigration.Enabled(&c.ComponentConfig.Generic) {
|
||||||
},
|
klog.Info("starting leader migration")
|
||||||
c.Kubeconfig,
|
|
||||||
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
|
||||||
if err != nil {
|
"cloud-controller-manager")
|
||||||
klog.Fatalf("error creating lock: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try and become the leader and start cloud controller manager loops
|
// Start the main lock
|
||||||
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
|
go leaderElectAndRun(c, id, electionChecker,
|
||||||
Lock: rl,
|
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
|
||||||
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
|
c.ComponentConfig.Generic.LeaderElection.ResourceName,
|
||||||
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
|
leaderelection.LeaderCallbacks{
|
||||||
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
|
OnStartedLeading: func(ctx context.Context) {
|
||||||
Callbacks: leaderelection.LeaderCallbacks{
|
initializers := controllerInitializers
|
||||||
OnStartedLeading: run,
|
if leaderMigrator != nil {
|
||||||
|
// If leader migration is enabled, we should start only non-migrated controllers
|
||||||
|
// for the main lock.
|
||||||
|
initializers = filterInitializers(controllerInitializers, leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
|
||||||
|
klog.Info("leader migration: starting main controllers.")
|
||||||
|
// Signal the main lock is acquired, and thus migration lock is ready to attempt.
|
||||||
|
close(leaderMigrator.MigrationReady)
|
||||||
|
}
|
||||||
|
run(ctx, initializers)
|
||||||
|
},
|
||||||
OnStoppedLeading: func() {
|
OnStoppedLeading: func() {
|
||||||
klog.Fatalf("leaderelection lost")
|
klog.Fatalf("leaderelection lost")
|
||||||
},
|
},
|
||||||
},
|
|
||||||
WatchDog: electionChecker,
|
|
||||||
Name: "cloud-controller-manager",
|
|
||||||
})
|
})
|
||||||
panic("unreachable")
|
|
||||||
|
// If Leader Migration is enabled, proceed to attempt the migration lock.
|
||||||
|
if leaderMigrator != nil {
|
||||||
|
// Wait for the signal of main lock being acquired.
|
||||||
|
<-leaderMigrator.MigrationReady
|
||||||
|
|
||||||
|
// Start the migration lock.
|
||||||
|
go leaderElectAndRun(c, id, electionChecker,
|
||||||
|
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
|
||||||
|
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
|
||||||
|
leaderelection.LeaderCallbacks{
|
||||||
|
OnStartedLeading: func(ctx context.Context) {
|
||||||
|
klog.Info("leader migration: starting migrated controllers.")
|
||||||
|
run(ctx, filterInitializers(controllerInitializers, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
|
||||||
|
},
|
||||||
|
OnStoppedLeading: func() {
|
||||||
|
klog.Fatalf("migration leaderelection lost")
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// startControllers starts the cloud specific controller loops.
|
// startControllers starts the cloud specific controller loops.
|
||||||
@ -417,3 +444,48 @@ func ResyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration
|
|||||||
return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
|
return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
|
||||||
|
// TODO: extract this function into staging/controller-manager
|
||||||
|
func leaderElectAndRun(c *cloudcontrollerconfig.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
|
||||||
|
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
|
||||||
|
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
|
||||||
|
leaseName,
|
||||||
|
resourcelock.ResourceLockConfig{
|
||||||
|
Identity: lockIdentity,
|
||||||
|
EventRecorder: c.EventRecorder,
|
||||||
|
},
|
||||||
|
c.Kubeconfig,
|
||||||
|
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("error creating lock: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
|
||||||
|
Lock: rl,
|
||||||
|
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
|
||||||
|
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
|
||||||
|
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
|
||||||
|
Callbacks: callbacks,
|
||||||
|
WatchDog: electionChecker,
|
||||||
|
Name: leaseName,
|
||||||
|
})
|
||||||
|
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterInitializers returns initializers that has filterFunc(name) == expected.
|
||||||
|
// filterFunc can be nil, in which case the original initializers will be returned directly.
|
||||||
|
// InitFunc is local to cloud-controller-manager, and thus filterInitializers has to be local too.
|
||||||
|
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]InitFunc {
|
||||||
|
if filterFunc == nil {
|
||||||
|
return allInitializers
|
||||||
|
}
|
||||||
|
initializers := make(map[string]InitFunc)
|
||||||
|
for name, initFunc := range allInitializers {
|
||||||
|
if filterFunc(name) == expected {
|
||||||
|
initializers[name] = initFunc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return initializers
|
||||||
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
componentbaseconfig "k8s.io/component-base/config"
|
componentbaseconfig "k8s.io/component-base/config"
|
||||||
cmconfig "k8s.io/controller-manager/config"
|
cmconfig "k8s.io/controller-manager/config"
|
||||||
cmoptions "k8s.io/controller-manager/options"
|
cmoptions "k8s.io/controller-manager/options"
|
||||||
|
migration "k8s.io/controller-manager/pkg/leadermigration/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDefaultFlags(t *testing.T) {
|
func TestDefaultFlags(t *testing.T) {
|
||||||
@ -65,6 +66,7 @@ func TestDefaultFlags(t *testing.T) {
|
|||||||
EnableContentionProfiling: false,
|
EnableContentionProfiling: false,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
LeaderMigration: &migration.LeaderMigrationOptions{},
|
||||||
},
|
},
|
||||||
KubeCloudShared: &KubeCloudSharedOptions{
|
KubeCloudShared: &KubeCloudSharedOptions{
|
||||||
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
|
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
|
||||||
@ -203,6 +205,7 @@ func TestAddFlags(t *testing.T) {
|
|||||||
EnableContentionProfiling: true,
|
EnableContentionProfiling: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
LeaderMigration: &migration.LeaderMigrationOptions{},
|
||||||
},
|
},
|
||||||
KubeCloudShared: &KubeCloudSharedOptions{
|
KubeCloudShared: &KubeCloudSharedOptions{
|
||||||
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
|
KubeCloudSharedConfiguration: &cpconfig.KubeCloudSharedConfiguration{
|
||||||
|
@ -42,7 +42,7 @@ func NewGenericControllerManagerConfigurationOptions(cfg *cmconfig.GenericContro
|
|||||||
o := &GenericControllerManagerConfigurationOptions{
|
o := &GenericControllerManagerConfigurationOptions{
|
||||||
GenericControllerManagerConfiguration: cfg,
|
GenericControllerManagerConfiguration: cfg,
|
||||||
Debugging: RecommendedDebuggingOptions(),
|
Debugging: RecommendedDebuggingOptions(),
|
||||||
LeaderMigration: nil,
|
LeaderMigration: &migration.LeaderMigrationOptions{},
|
||||||
}
|
}
|
||||||
|
|
||||||
return o
|
return o
|
||||||
|
@ -0,0 +1,35 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package leadermigration
|
||||||
|
|
||||||
|
// FilterResult indicates whether and how the controller manager should start the controller.
|
||||||
|
type FilterResult int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ControllerOwned indicates that the controller is owned by another controller manager
|
||||||
|
// and thus should NOT be started by this controller manager.
|
||||||
|
ControllerUnowned = iota
|
||||||
|
// ControllerMigrated indicates that the controller manager should start this controller
|
||||||
|
// with thte migration lock.
|
||||||
|
ControllerMigrated
|
||||||
|
// ControllerNonMigrated indicates that the controller manager should start this controller
|
||||||
|
// with thte main lock.
|
||||||
|
ControllerNonMigrated
|
||||||
|
)
|
||||||
|
|
||||||
|
// FilterFunc takes a name of controller, returning a FilterResult indicating how to start controller.
|
||||||
|
type FilterFunc func(controllerName string) FilterResult
|
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package leadermigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
internal "k8s.io/controller-manager/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LeaderMigrator holds information required by the leader migration process.
|
||||||
|
type LeaderMigrator struct {
|
||||||
|
// MigrationReady is closed after the controller manager finishes preparing for the migration lock.
|
||||||
|
// After this point, the leader migration process will proceed to acquire the migration lock.
|
||||||
|
MigrationReady chan struct{}
|
||||||
|
|
||||||
|
// FilterFunc returns a FilterResult telling the controller manager what to do with the controller.
|
||||||
|
FilterFunc FilterFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLeaderMigrator creates a LeaderMigrator with given config for the given component. component
|
||||||
|
// indicates which controller manager is requesting this leader migration, and it should be consistent
|
||||||
|
// with the component field of ControllerLeaderConfiguration.
|
||||||
|
func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component string) *LeaderMigrator {
|
||||||
|
migratedControllers := make(map[string]bool)
|
||||||
|
for _, leader := range config.ControllerLeaders {
|
||||||
|
migratedControllers[leader.Name] = leader.Component == component
|
||||||
|
}
|
||||||
|
return &LeaderMigrator{
|
||||||
|
MigrationReady: make(chan struct{}),
|
||||||
|
FilterFunc: func(controllerName string) FilterResult {
|
||||||
|
shouldRun, ok := migratedControllers[controllerName]
|
||||||
|
if ok {
|
||||||
|
// The controller is included in the migration
|
||||||
|
if shouldRun {
|
||||||
|
// If the controller manager should run the controller,
|
||||||
|
// start it in the migration lock.
|
||||||
|
return ControllerMigrated
|
||||||
|
}
|
||||||
|
// Otherwise, the controller should be started by
|
||||||
|
// some other controller manager.
|
||||||
|
return ControllerUnowned
|
||||||
|
}
|
||||||
|
// The controller is not included in the migration,
|
||||||
|
// and should be started in the main lock.
|
||||||
|
return ControllerNonMigrated
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,119 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package leadermigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
internal "k8s.io/controller-manager/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLeaderMigratorFilterFunc(t *testing.T) {
|
||||||
|
fromConfig := &internal.LeaderMigrationConfiguration{
|
||||||
|
ResourceLock: "leases",
|
||||||
|
LeaderName: "cloud-provider-extraction-migration",
|
||||||
|
ControllerLeaders: []internal.ControllerLeaderConfiguration{
|
||||||
|
{
|
||||||
|
Name: "route",
|
||||||
|
Component: "kube-controller-manager",
|
||||||
|
}, {
|
||||||
|
Name: "service",
|
||||||
|
Component: "kube-controller-manager",
|
||||||
|
}, {
|
||||||
|
Name: "cloud-node-lifecycle",
|
||||||
|
Component: "kube-controller-manager",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
toConfig := &internal.LeaderMigrationConfiguration{
|
||||||
|
ResourceLock: "leases",
|
||||||
|
LeaderName: "cloud-provider-extraction-migration",
|
||||||
|
ControllerLeaders: []internal.ControllerLeaderConfiguration{
|
||||||
|
{
|
||||||
|
Name: "route",
|
||||||
|
Component: "cloud-controller-manager",
|
||||||
|
}, {
|
||||||
|
Name: "service",
|
||||||
|
Component: "cloud-controller-manager",
|
||||||
|
}, {
|
||||||
|
Name: "cloud-node-lifecycle",
|
||||||
|
Component: "cloud-controller-manager",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
config *internal.LeaderMigrationConfiguration
|
||||||
|
component string
|
||||||
|
migrated bool
|
||||||
|
expectResult map[string]FilterResult
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "from config, kcm",
|
||||||
|
config: fromConfig,
|
||||||
|
component: "kube-controller-manager",
|
||||||
|
expectResult: map[string]FilterResult{
|
||||||
|
"deployment": ControllerNonMigrated,
|
||||||
|
"route": ControllerMigrated,
|
||||||
|
"service": ControllerMigrated,
|
||||||
|
"cloud-node-lifecycle": ControllerMigrated,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "from config, ccm",
|
||||||
|
config: fromConfig,
|
||||||
|
component: "cloud-controller-manager",
|
||||||
|
expectResult: map[string]FilterResult{
|
||||||
|
"cloud-node": ControllerNonMigrated,
|
||||||
|
"route": ControllerUnowned,
|
||||||
|
"service": ControllerUnowned,
|
||||||
|
"cloud-node-lifecycle": ControllerUnowned,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "to config, kcm",
|
||||||
|
config: toConfig,
|
||||||
|
component: "kube-controller-manager",
|
||||||
|
expectResult: map[string]FilterResult{
|
||||||
|
"deployment": ControllerNonMigrated,
|
||||||
|
"route": ControllerUnowned,
|
||||||
|
"service": ControllerUnowned,
|
||||||
|
"cloud-node-lifecycle": ControllerUnowned,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "to config, ccm",
|
||||||
|
config: toConfig,
|
||||||
|
component: "cloud-controller-manager",
|
||||||
|
expectResult: map[string]FilterResult{
|
||||||
|
"cloud-node": ControllerNonMigrated,
|
||||||
|
"route": ControllerMigrated,
|
||||||
|
"service": ControllerMigrated,
|
||||||
|
"cloud-node-lifecycle": ControllerMigrated,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
migrator := NewLeaderMigrator(tc.config, tc.component)
|
||||||
|
for name, expected := range tc.expectResult {
|
||||||
|
if result := migrator.FilterFunc(name); expected != result {
|
||||||
|
t.Errorf("controller %s, expect %v, got %v", name, expected, result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package leadermigration
|
||||||
|
|
||||||
|
import config "k8s.io/controller-manager/config"
|
||||||
|
|
||||||
|
// Enabled checks whether Leader Migration should be enabled, given the GenericControllerManagerConfiguration.
|
||||||
|
// It considers the feature gate first, and will always return false if the feature gate is not enabled.
|
||||||
|
func Enabled(genericConfig *config.GenericControllerManagerConfiguration) bool {
|
||||||
|
return FeatureEnabled() && genericConfig.LeaderElection.LeaderElect && genericConfig.LeaderMigrationEnabled
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user