implementation of leader migration.

This commit is contained in:
Indeed 2021-03-03 18:08:45 -08:00
parent 68ebe29529
commit 721b1822d6
4 changed files with 340 additions and 72 deletions

View File

@ -61,6 +61,7 @@ import (
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/controller-manager/pkg/leadermigration"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
@ -207,32 +208,17 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
}
}
run := func(ctx context.Context) {
rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
var clientBuilder clientbuilder.ControllerClientBuilder
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
// It's possible another controller process is creating the tokens for us.
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder, rootClientBuilder := createClientBuilders(c)
clientBuilder = clientbuilder.NewDynamicClientBuilder(
restclient.AnonymousClientConfig(c.Kubeconfig),
c.Client.CoreV1(),
metav1.NamespaceSystem)
} else {
clientBuilder = rootClientBuilder
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
run := func(ctx context.Context, startSATokenController InitFunc, filterFunc leadermigration.FilterFunc) {
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
controllerInitializers := filterInitializers(NewControllerInitializers(controllerContext.LoopMode), filterFunc)
if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
@ -243,8 +229,9 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
select {}
}
// No leader election, run directly
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
run(context.TODO(), saTokenControllerInitFunc, nil)
panic("unreachable")
}
@ -256,33 +243,102 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
rl, err := resourcelock.NewFromKubeconfig(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
leaderElectAndRun := func(resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
})
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
// If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled.
if leadermigration.FeatureEnabled() && leadermigration.Enabled(&c.ComponentConfig.Generic) {
klog.Infof("starting leader migration")
leaderMigrator := leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"kube-controller-manager")
// We need a channel to signal the start of Service Account Token Controller
// all migrated channel must start afterwards.
saTokenControllerStarted := make(chan struct{})
// Wrap saTokenControllerInitFunc to close the channel after returning.
wrappedSATokenControllerInitFunc := func(ctx ControllerContext) (http.Handler, bool, error) {
defer close(saTokenControllerStarted)
return saTokenControllerInitFunc(ctx)
}
// Start the main lock, using LeaderElectionConfiguration for the type and name of the lease.
go leaderElectAndRun(
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("leader migration: starting main controllers.")
// saTokenController should only run under the main lock
run(ctx, wrappedSATokenControllerInitFunc, leaderMigrator.FilterFunc(false))
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
// Wait for Service Account Token Controller to start before acquiring the migration lock.
// At this point, the main lock must have already been acquired, or the KCM process already exited.
// We wait for the main lock before acquiring the migration lock to prevent the situation
// where KCM instance A holds the main lock while KCM instance B holds the migration lock.
<-saTokenControllerStarted
// Start the migration lock, using LeaderMigrationConfiguration for the type and name of the lease.
leaderElectAndRun(
c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("leader migration: starting migrated controllers.")
// DO NOT start saTokenController under migration lock (passing nil)
run(ctx, nil, leaderMigrator.FilterFunc(true))
},
OnStoppedLeading: func() {
klog.Fatalf("migration leaderelection lost")
},
})
panic("unreachable")
} // end of leader migration
// Normal leader election, without leader migration
leaderElectAndRun(
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(ctx, saTokenControllerInitFunc, nil)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
})
panic("unreachable")
}
@ -504,8 +560,10 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if _, _, err := startSATokenController(ctx); err != nil {
return err
if startSATokenController != nil {
if _, _, err := startSATokenController(ctx); err != nil {
return err
}
}
// Initialize the cloud provider with a reference to the clientBuilder only after token controller
@ -609,3 +667,40 @@ func readCA(file string) ([]byte, error) {
return rootCA, err
}
// createClientBuilders creates clientBuilder and rootClientBuilder from the given configuration
func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilder.ControllerClientBuilder, rootClientBuilder clientbuilder.ControllerClientBuilder) {
rootClientBuilder = clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
// It's possible another controller process is creating the tokens for us.
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder = clientbuilder.NewDynamicClientBuilder(
restclient.AnonymousClientConfig(c.Kubeconfig),
c.Client.CoreV1(),
metav1.NamespaceSystem)
} else {
clientBuilder = rootClientBuilder
}
return
}
// filterInitializers returns initializers that has filterFunc(name) == true.
// filterFunc can be nil, in which case the original initializers will be returned directly.
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc {
if filterFunc == nil {
return allInitializers
}
initializers := make(map[string]InitFunc)
for name, initFunc := range allInitializers {
if filterFunc(name) {
initializers[name] = initFunc
}
}
return initializers
}

View File

@ -52,6 +52,7 @@ import (
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/pkg/clientbuilder"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/controller-manager/pkg/leadermigration"
"k8s.io/klog/v2"
)
@ -173,7 +174,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
}
}
run := func(ctx context.Context) {
run := func(ctx context.Context, controllerInitializers map[string]InitFunc) {
clientBuilder := clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
@ -187,7 +188,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
}
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
run(context.TODO(), controllerInitializers)
panic("unreachable")
}
@ -199,35 +200,96 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// Lock required for leader election
rl, err := resourcelock.NewFromKubeconfig(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
// leaderElectAndRun runs the callbacks once the leader lease is acquired.
leaderElectAndRun := func(resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
// Lock required for leader election
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
},
c.Kubeconfig,
c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: callbacks,
WatchDog: electionChecker,
Name: leaseName,
})
}
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
// If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled.
if leadermigration.FeatureEnabled() && leadermigration.Enabled(&c.ComponentConfig.Generic) {
klog.Info("starting leader migration")
leaderMigrator := leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"cloud-controller-manager")
// Split initializers based on which lease they require, main lease or migration lease.
migratedInitializers, unmigratedInitializers := devideInitializers(controllerInitializers, leaderMigrator.FilterFunc(true))
// We need to signal acquiring the main lock before attempting to acquire the migration lock
// so that no instance will hold the migration lock but not the main lock at any point.
mainLockAcquired := make(chan struct{})
// Start the main lock.
go leaderElectAndRun(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("leader migration: starting main controllers.")
// signal acquiring the main lock
close(mainLockAcquired)
run(ctx, unmigratedInitializers)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
})
// Wait for the main lease to be acquired before attempting the migration lease.
<-mainLockAcquired
// Start the migration lock.
leaderElectAndRun(c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
c.ComponentConfig.Generic.LeaderMigration.LeaderName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("leader migration: starting migrated controllers.")
run(ctx, migratedInitializers)
},
OnStoppedLeading: func() {
klog.Fatalf("migration leaderelection lost")
},
})
panic("unreachable")
} // end of leader migration
// Normal leader election, without leader migration
leaderElectAndRun(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(ctx, controllerInitializers)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "cloud-controller-manager",
})
})
panic("unreachable")
}
@ -417,3 +479,23 @@ func ResyncPeriod(c *cloudcontrollerconfig.CompletedConfig) func() time.Duration
return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
}
}
// devideInitializers applies filterFunc to each of the given intializiers.
// filtered contains all controllers that have filterFunc(name) == true
// rest contains all controllers that have filterFunc(name) == false
func devideInitializers(allInitializers map[string]InitFunc,
filterFunc leadermigration.FilterFunc) (filtered map[string]InitFunc, rest map[string]InitFunc) {
if filterFunc == nil {
return make(map[string]InitFunc), allInitializers
}
filtered = make(map[string]InitFunc)
rest = make(map[string]InitFunc)
for name, initFunc := range allInitializers {
if filterFunc(name) {
filtered[name] = initFunc
} else {
rest[name] = initFunc
}
}
return
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leadermigration
import (
internal "k8s.io/controller-manager/config"
)
// LeaderMigrator holds information required by the leader migration process.
type LeaderMigrator struct {
config *internal.LeaderMigrationConfiguration
migratedControllers map[string]bool
// component indicates the name of the control-plane component that uses leader migration,
// which should be a controller manager, i.e. kube-controller-manager or cloud-controller-manager
component string
}
// FilterFunc takes a name of controller, returning whether the controller should be started.
type FilterFunc func(controllerName string) bool
// NewLeaderMigrator creates a LeaderMigrator with given config for the given component. The component
// indicates which controller manager is requesting this leader migration, and it should be consistent
// with the component field of ControllerLeaderConfiguration.
func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component string) *LeaderMigrator {
migratedControllers := make(map[string]bool)
for _, leader := range config.ControllerLeaders {
migratedControllers[leader.Name] = leader.Component == component
}
return &LeaderMigrator{
config: config,
migratedControllers: migratedControllers,
component: component,
}
}
// FilterFunc returns the filter function that, when migrated == true
// - returns true if the controller should start under the migration lock
// - returns false if the controller should start under the main lock
// when migrated == false, the result is inverted.
func (m *LeaderMigrator) FilterFunc(migrated bool) FilterFunc {
return func(controllerName string) bool {
shouldRun, ok := m.migratedControllers[controllerName]
if !ok {
// The controller is not included in the migration
// If the caller wants the controllers outside migration, then we should include it.
return !migrated
}
// The controller is included in the migration
// If the caller wants the controllers within migration, we should only include it
// if current component should run the controller
return migrated && shouldRun
}
}

View File

@ -0,0 +1,24 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leadermigration
import config "k8s.io/controller-manager/config"
// Enabled checks whether Leader Migration should be enabled, given the GenericControllerManagerConfiguration.
func Enabled(genericConfig *config.GenericControllerManagerConfiguration) bool {
return genericConfig.LeaderElection.LeaderElect && genericConfig.LeaderMigrationEnabled
}