change filter to return a FilterResult.

This commit is contained in:
Indeed 2021-03-09 13:22:53 -08:00
parent e8479414ab
commit ba47f60e4b
5 changed files with 196 additions and 65 deletions

View File

@ -212,12 +212,12 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
run := func(ctx context.Context, startSATokenController InitFunc, filterFunc leadermigration.FilterFunc) {
run := func(ctx context.Context, startSATokenController InitFunc, filterFunc leadermigration.FilterFunc, requiredResult leadermigration.FilterResult) {
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
controllerInitializers := filterInitializers(NewControllerInitializers(controllerContext.LoopMode), filterFunc)
controllerInitializers := filterInitializers(NewControllerInitializers(controllerContext.LoopMode), filterFunc, requiredResult)
if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
@ -231,7 +231,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// No leader election, run directly
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO(), saTokenControllerInitFunc, nil)
run(context.TODO(), saTokenControllerInitFunc, nil, leadermigration.ControllerNonMigrated)
panic("unreachable")
}
@ -274,10 +274,10 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
if leaderMigrator != nil {
// If leader migration is enabled, we should start only non-migrated controllers
// for the main lock.
filterFunc = leaderMigrator.FilterFunc(false)
filterFunc = leaderMigrator.FilterFunc
klog.Info("leader migration: starting main controllers.")
}
run(ctx, startSATokenController, filterFunc)
run(ctx, startSATokenController, filterFunc, leadermigration.ControllerNonMigrated)
},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
@ -300,7 +300,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
OnStartedLeading: func(ctx context.Context) {
klog.Info("leader migration: starting migrated controllers.")
// DO NOT start saTokenController under migration lock (passing nil)
run(ctx, nil, leaderMigrator.FilterFunc(true))
run(ctx, nil, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated)
},
OnStoppedLeading: func() {
klog.Fatalf("migration leaderelection lost")
@ -688,16 +688,16 @@ func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionC
panic("unreachable")
}
// filterInitializers returns initializers that has filterFunc(name) == true.
// filterInitializers returns initializers that has filterFunc(name) == expected.
// filterFunc can be nil, in which case the original initializers will be returned directly.
// InitFunc is local to kube-controller-manager, and thus filterInitializers has to be local too.
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc {
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]InitFunc {
if filterFunc == nil {
return allInitializers
}
initializers := make(map[string]InitFunc)
for name, initFunc := range allInitializers {
if filterFunc(name) {
if filterFunc(name) == expected {
initializers[name] = initFunc
}
}

View File

@ -203,10 +203,6 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
var leaderMigrator *leadermigration.LeaderMigrator = nil
// Leader migration requires splitting initializers for the main and the migration lock
// If leader migration is not enabled, these two will be unused.
var migratedInitializers, unmigratedInitializers map[string]InitFunc
// If leader migration is enabled, use the redirected initialization
// Check feature gate and configuration separately so that any error in configuration checking will not
// affect the result if the feature is not enabled.
@ -215,9 +211,6 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"cloud-controller-manager")
// Split initializers based on which lease they require, main lease or migration lease.
migratedInitializers, unmigratedInitializers = divideInitializers(controllerInitializers, leaderMigrator.FilterFunc(true))
}
// Start the main lock
@ -230,7 +223,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
if leaderMigrator != nil {
// If leader migration is enabled, we should start only non-migrated controllers
// for the main lock.
initializers = unmigratedInitializers
initializers = filterInitializers(controllerInitializers, leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
klog.Info("leader migration: starting main controllers.")
// Signal the main lock is acquired, and thus migration lock is ready to attempt.
close(leaderMigrator.MigrationReady)
@ -254,7 +247,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("leader migration: starting migrated controllers.")
run(ctx, migratedInitializers)
run(ctx, filterInitializers(controllerInitializers, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
},
OnStoppedLeading: func() {
klog.Fatalf("migration leaderelection lost")
@ -481,23 +474,18 @@ func leaderElectAndRun(c *cloudcontrollerconfig.CompletedConfig, lockIdentity st
panic("unreachable")
}
// divideInitializers applies filterFunc to each of the given intializiers.
// filtered contains all controllers that have filterFunc(name) == true
// rest contains all controllers that have filterFunc(name) == false
// InitFunc is local to cloud-controller-manager, and thus divideInitializers has to be local too.
func divideInitializers(allInitializers map[string]InitFunc,
filterFunc leadermigration.FilterFunc) (filtered map[string]InitFunc, rest map[string]InitFunc) {
// filterInitializers returns initializers that has filterFunc(name) == expected.
// filterFunc can be nil, in which case the original initializers will be returned directly.
// InitFunc is local to cloud-controller-manager, and thus filterInitializers has to be local too.
func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]InitFunc {
if filterFunc == nil {
return make(map[string]InitFunc), allInitializers
return allInitializers
}
filtered = make(map[string]InitFunc)
rest = make(map[string]InitFunc)
initializers := make(map[string]InitFunc)
for name, initFunc := range allInitializers {
if filterFunc(name) {
filtered[name] = initFunc
} else {
rest[name] = initFunc
if filterFunc(name) == expected {
initializers[name] = initFunc
}
}
return
return initializers
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leadermigration
// FilterResult indicates whether and how the controller manager should start the controller.
type FilterResult int32
const (
// ControllerOwned indicates that the controller is owned by another controller manager
// and thus should NOT be started by this controller manager.
ControllerUnowned = iota
// ControllerMigrated indicates that the controller manager should start this controller
// with thte migration lock.
ControllerMigrated
// ControllerNonMigrated indicates that the controller manager should start this controller
// with thte main lock.
ControllerNonMigrated
)
// FilterFunc takes a name of controller, returning a FilterResult indicating how to start controller.
type FilterFunc func(controllerName string) FilterResult

View File

@ -22,21 +22,15 @@ import (
// LeaderMigrator holds information required by the leader migration process.
type LeaderMigrator struct {
// MigrationReady is closed after the coontroller manager finishes preparing for the migration lock.
// MigrationReady is closed after the controller manager finishes preparing for the migration lock.
// After this point, the leader migration process will proceed to acquire the migration lock.
MigrationReady chan struct{}
config *internal.LeaderMigrationConfiguration
migratedControllers map[string]bool
// component indicates the name of the control-plane component that uses leader migration,
// which should be a controller manager, i.e. kube-controller-manager or cloud-controller-manager
component string
// FilterFunc returns a FilterResult telling the controller manager what to do with the controller.
FilterFunc FilterFunc
}
// FilterFunc takes a name of controller, returning whether the controller should be started.
type FilterFunc func(controllerName string) bool
// NewLeaderMigrator creates a LeaderMigrator with given config for the given component. The component
// NewLeaderMigrator creates a LeaderMigrator with given config for the given component. component
// indicates which controller manager is requesting this leader migration, and it should be consistent
// with the component field of ControllerLeaderConfiguration.
func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component string) *LeaderMigrator {
@ -46,27 +40,22 @@ func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component
}
return &LeaderMigrator{
MigrationReady: make(chan struct{}),
config: config,
migratedControllers: migratedControllers,
component: component,
}
}
// FilterFunc returns the filter function that, when migrated == true
// - returns true if the controller should start under the migration lock
// - returns false if the controller should start under the main lock
// when migrated == false, the result is inverted.
func (m *LeaderMigrator) FilterFunc(migrated bool) FilterFunc {
return func(controllerName string) bool {
shouldRun, ok := m.migratedControllers[controllerName]
if !ok {
// The controller is not included in the migration
// If the caller wants the controllers outside migration, then we should include it.
return !migrated
}
FilterFunc: func(controllerName string) FilterResult {
shouldRun, ok := migratedControllers[controllerName]
if ok {
// The controller is included in the migration
// If the caller wants the controllers within migration, we should only include it
// if current component should run the controller
return migrated && shouldRun
if shouldRun {
// If the controller manager should run the controller,
// start it in the migration lock.
return ControllerMigrated
}
// Otherwise, the controller should be started by
// some other controller manager.
return ControllerUnowned
}
// The controller is not included in the migration,
// and should be started in the main lock.
return ControllerNonMigrated
},
}
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leadermigration
import (
"testing"
internal "k8s.io/controller-manager/config"
)
func TestLeaderMigratorFilterFunc(t *testing.T) {
fromConfig := &internal.LeaderMigrationConfiguration{
ResourceLock: "leases",
LeaderName: "cloud-provider-extraction-migration",
ControllerLeaders: []internal.ControllerLeaderConfiguration{
{
Name: "route",
Component: "kube-controller-manager",
}, {
Name: "service",
Component: "kube-controller-manager",
}, {
Name: "cloud-node-lifecycle",
Component: "kube-controller-manager",
},
},
}
toConfig := &internal.LeaderMigrationConfiguration{
ResourceLock: "leases",
LeaderName: "cloud-provider-extraction-migration",
ControllerLeaders: []internal.ControllerLeaderConfiguration{
{
Name: "route",
Component: "cloud-controller-manager",
}, {
Name: "service",
Component: "cloud-controller-manager",
}, {
Name: "cloud-node-lifecycle",
Component: "cloud-controller-manager",
},
},
}
for _, tc := range []struct {
name string
config *internal.LeaderMigrationConfiguration
component string
migrated bool
expectResult map[string]FilterResult
}{
{
name: "from config, kcm",
config: fromConfig,
component: "kube-controller-manager",
expectResult: map[string]FilterResult{
"deployment": ControllerNonMigrated,
"route": ControllerMigrated,
"service": ControllerMigrated,
"cloud-node-lifecycle": ControllerMigrated,
},
},
{
name: "from config, ccm",
config: fromConfig,
component: "cloud-controller-manager",
expectResult: map[string]FilterResult{
"cloud-node": ControllerNonMigrated,
"route": ControllerUnowned,
"service": ControllerUnowned,
"cloud-node-lifecycle": ControllerUnowned,
},
},
{
name: "to config, kcm",
config: toConfig,
component: "kube-controller-manager",
expectResult: map[string]FilterResult{
"deployment": ControllerNonMigrated,
"route": ControllerUnowned,
"service": ControllerUnowned,
"cloud-node-lifecycle": ControllerUnowned,
},
},
{
name: "to config, ccm",
config: toConfig,
component: "cloud-controller-manager",
expectResult: map[string]FilterResult{
"cloud-node": ControllerNonMigrated,
"route": ControllerMigrated,
"service": ControllerMigrated,
"cloud-node-lifecycle": ControllerMigrated,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
migrator := NewLeaderMigrator(tc.config, tc.component)
for name, expected := range tc.expectResult {
if result := migrator.FilterFunc(name); expected != result {
t.Errorf("controller %s, expect %v, got %v", name, expected, result)
}
}
})
}
}