diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 81c66c1969b..36508b0a6ca 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -212,12 +212,12 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController - run := func(ctx context.Context, startSATokenController InitFunc, filterFunc leadermigration.FilterFunc) { + run := func(ctx context.Context, startSATokenController InitFunc, filterFunc leadermigration.FilterFunc, requiredResult leadermigration.FilterResult) { controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { klog.Fatalf("error building controller context: %v", err) } - controllerInitializers := filterInitializers(NewControllerInitializers(controllerContext.LoopMode), filterFunc) + controllerInitializers := filterInitializers(NewControllerInitializers(controllerContext.LoopMode), filterFunc, requiredResult) if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil { klog.Fatalf("error starting controllers: %v", err) } @@ -231,7 +231,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // No leader election, run directly if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { - run(context.TODO(), saTokenControllerInitFunc, nil) + run(context.TODO(), saTokenControllerInitFunc, nil, leadermigration.ControllerNonMigrated) panic("unreachable") } @@ -274,10 +274,10 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { if leaderMigrator != nil { // If leader migration is enabled, we should start only non-migrated controllers // for the main lock. - filterFunc = leaderMigrator.FilterFunc(false) + filterFunc = leaderMigrator.FilterFunc klog.Info("leader migration: starting main controllers.") } - run(ctx, startSATokenController, filterFunc) + run(ctx, startSATokenController, filterFunc, leadermigration.ControllerNonMigrated) }, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") @@ -300,7 +300,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { OnStartedLeading: func(ctx context.Context) { klog.Info("leader migration: starting migrated controllers.") // DO NOT start saTokenController under migration lock (passing nil) - run(ctx, nil, leaderMigrator.FilterFunc(true)) + run(ctx, nil, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated) }, OnStoppedLeading: func() { klog.Fatalf("migration leaderelection lost") @@ -688,16 +688,16 @@ func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionC panic("unreachable") } -// filterInitializers returns initializers that has filterFunc(name) == true. +// filterInitializers returns initializers that has filterFunc(name) == expected. // filterFunc can be nil, in which case the original initializers will be returned directly. // InitFunc is local to kube-controller-manager, and thus filterInitializers has to be local too. -func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc) map[string]InitFunc { +func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]InitFunc { if filterFunc == nil { return allInitializers } initializers := make(map[string]InitFunc) for name, initFunc := range allInitializers { - if filterFunc(name) { + if filterFunc(name) == expected { initializers[name] = initFunc } } diff --git a/staging/src/k8s.io/cloud-provider/app/controllermanager.go b/staging/src/k8s.io/cloud-provider/app/controllermanager.go index 57ca0eb04a6..82d9ab2da1c 100644 --- a/staging/src/k8s.io/cloud-provider/app/controllermanager.go +++ b/staging/src/k8s.io/cloud-provider/app/controllermanager.go @@ -203,10 +203,6 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface // leaderMigrator will be non-nil if and only if Leader Migration is enabled. var leaderMigrator *leadermigration.LeaderMigrator = nil - // Leader migration requires splitting initializers for the main and the migration lock - // If leader migration is not enabled, these two will be unused. - var migratedInitializers, unmigratedInitializers map[string]InitFunc - // If leader migration is enabled, use the redirected initialization // Check feature gate and configuration separately so that any error in configuration checking will not // affect the result if the feature is not enabled. @@ -215,9 +211,6 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration, "cloud-controller-manager") - - // Split initializers based on which lease they require, main lease or migration lease. - migratedInitializers, unmigratedInitializers = divideInitializers(controllerInitializers, leaderMigrator.FilterFunc(true)) } // Start the main lock @@ -230,7 +223,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface if leaderMigrator != nil { // If leader migration is enabled, we should start only non-migrated controllers // for the main lock. - initializers = unmigratedInitializers + initializers = filterInitializers(controllerInitializers, leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated) klog.Info("leader migration: starting main controllers.") // Signal the main lock is acquired, and thus migration lock is ready to attempt. close(leaderMigrator.MigrationReady) @@ -254,7 +247,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { klog.Info("leader migration: starting migrated controllers.") - run(ctx, migratedInitializers) + run(ctx, filterInitializers(controllerInitializers, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated)) }, OnStoppedLeading: func() { klog.Fatalf("migration leaderelection lost") @@ -481,23 +474,18 @@ func leaderElectAndRun(c *cloudcontrollerconfig.CompletedConfig, lockIdentity st panic("unreachable") } -// divideInitializers applies filterFunc to each of the given intializiers. -// filtered contains all controllers that have filterFunc(name) == true -// rest contains all controllers that have filterFunc(name) == false -// InitFunc is local to cloud-controller-manager, and thus divideInitializers has to be local too. -func divideInitializers(allInitializers map[string]InitFunc, - filterFunc leadermigration.FilterFunc) (filtered map[string]InitFunc, rest map[string]InitFunc) { +// filterInitializers returns initializers that has filterFunc(name) == expected. +// filterFunc can be nil, in which case the original initializers will be returned directly. +// InitFunc is local to cloud-controller-manager, and thus filterInitializers has to be local too. +func filterInitializers(allInitializers map[string]InitFunc, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]InitFunc { if filterFunc == nil { - return make(map[string]InitFunc), allInitializers + return allInitializers } - filtered = make(map[string]InitFunc) - rest = make(map[string]InitFunc) + initializers := make(map[string]InitFunc) for name, initFunc := range allInitializers { - if filterFunc(name) { - filtered[name] = initFunc - } else { - rest[name] = initFunc + if filterFunc(name) == expected { + initializers[name] = initFunc } } - return + return initializers } diff --git a/staging/src/k8s.io/controller-manager/pkg/leadermigration/filter.go b/staging/src/k8s.io/controller-manager/pkg/leadermigration/filter.go new file mode 100644 index 00000000000..34496a1c14d --- /dev/null +++ b/staging/src/k8s.io/controller-manager/pkg/leadermigration/filter.go @@ -0,0 +1,35 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leadermigration + +// FilterResult indicates whether and how the controller manager should start the controller. +type FilterResult int32 + +const ( + // ControllerOwned indicates that the controller is owned by another controller manager + // and thus should NOT be started by this controller manager. + ControllerUnowned = iota + // ControllerMigrated indicates that the controller manager should start this controller + // with thte migration lock. + ControllerMigrated + // ControllerNonMigrated indicates that the controller manager should start this controller + // with thte main lock. + ControllerNonMigrated +) + +// FilterFunc takes a name of controller, returning a FilterResult indicating how to start controller. +type FilterFunc func(controllerName string) FilterResult diff --git a/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator.go b/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator.go index a1001ed9e00..3ff19e3c1d0 100644 --- a/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator.go +++ b/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator.go @@ -22,21 +22,15 @@ import ( // LeaderMigrator holds information required by the leader migration process. type LeaderMigrator struct { - // MigrationReady is closed after the coontroller manager finishes preparing for the migration lock. + // MigrationReady is closed after the controller manager finishes preparing for the migration lock. // After this point, the leader migration process will proceed to acquire the migration lock. MigrationReady chan struct{} - config *internal.LeaderMigrationConfiguration - migratedControllers map[string]bool - // component indicates the name of the control-plane component that uses leader migration, - // which should be a controller manager, i.e. kube-controller-manager or cloud-controller-manager - component string + // FilterFunc returns a FilterResult telling the controller manager what to do with the controller. + FilterFunc FilterFunc } -// FilterFunc takes a name of controller, returning whether the controller should be started. -type FilterFunc func(controllerName string) bool - -// NewLeaderMigrator creates a LeaderMigrator with given config for the given component. The component +// NewLeaderMigrator creates a LeaderMigrator with given config for the given component. component // indicates which controller manager is requesting this leader migration, and it should be consistent // with the component field of ControllerLeaderConfiguration. func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component string) *LeaderMigrator { @@ -45,28 +39,23 @@ func NewLeaderMigrator(config *internal.LeaderMigrationConfiguration, component migratedControllers[leader.Name] = leader.Component == component } return &LeaderMigrator{ - MigrationReady: make(chan struct{}), - config: config, - migratedControllers: migratedControllers, - component: component, - } -} - -// FilterFunc returns the filter function that, when migrated == true -// - returns true if the controller should start under the migration lock -// - returns false if the controller should start under the main lock -// when migrated == false, the result is inverted. -func (m *LeaderMigrator) FilterFunc(migrated bool) FilterFunc { - return func(controllerName string) bool { - shouldRun, ok := m.migratedControllers[controllerName] - if !ok { - // The controller is not included in the migration - // If the caller wants the controllers outside migration, then we should include it. - return !migrated - } - // The controller is included in the migration - // If the caller wants the controllers within migration, we should only include it - // if current component should run the controller - return migrated && shouldRun + MigrationReady: make(chan struct{}), + FilterFunc: func(controllerName string) FilterResult { + shouldRun, ok := migratedControllers[controllerName] + if ok { + // The controller is included in the migration + if shouldRun { + // If the controller manager should run the controller, + // start it in the migration lock. + return ControllerMigrated + } + // Otherwise, the controller should be started by + // some other controller manager. + return ControllerUnowned + } + // The controller is not included in the migration, + // and should be started in the main lock. + return ControllerNonMigrated + }, } } diff --git a/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator_test.go b/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator_test.go new file mode 100644 index 00000000000..104990405dc --- /dev/null +++ b/staging/src/k8s.io/controller-manager/pkg/leadermigration/migrator_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leadermigration + +import ( + "testing" + + internal "k8s.io/controller-manager/config" +) + +func TestLeaderMigratorFilterFunc(t *testing.T) { + fromConfig := &internal.LeaderMigrationConfiguration{ + ResourceLock: "leases", + LeaderName: "cloud-provider-extraction-migration", + ControllerLeaders: []internal.ControllerLeaderConfiguration{ + { + Name: "route", + Component: "kube-controller-manager", + }, { + Name: "service", + Component: "kube-controller-manager", + }, { + Name: "cloud-node-lifecycle", + Component: "kube-controller-manager", + }, + }, + } + toConfig := &internal.LeaderMigrationConfiguration{ + ResourceLock: "leases", + LeaderName: "cloud-provider-extraction-migration", + ControllerLeaders: []internal.ControllerLeaderConfiguration{ + { + Name: "route", + Component: "cloud-controller-manager", + }, { + Name: "service", + Component: "cloud-controller-manager", + }, { + Name: "cloud-node-lifecycle", + Component: "cloud-controller-manager", + }, + }, + } + for _, tc := range []struct { + name string + config *internal.LeaderMigrationConfiguration + component string + migrated bool + expectResult map[string]FilterResult + }{ + { + name: "from config, kcm", + config: fromConfig, + component: "kube-controller-manager", + expectResult: map[string]FilterResult{ + "deployment": ControllerNonMigrated, + "route": ControllerMigrated, + "service": ControllerMigrated, + "cloud-node-lifecycle": ControllerMigrated, + }, + }, + { + name: "from config, ccm", + config: fromConfig, + component: "cloud-controller-manager", + expectResult: map[string]FilterResult{ + "cloud-node": ControllerNonMigrated, + "route": ControllerUnowned, + "service": ControllerUnowned, + "cloud-node-lifecycle": ControllerUnowned, + }, + }, + { + name: "to config, kcm", + config: toConfig, + component: "kube-controller-manager", + expectResult: map[string]FilterResult{ + "deployment": ControllerNonMigrated, + "route": ControllerUnowned, + "service": ControllerUnowned, + "cloud-node-lifecycle": ControllerUnowned, + }, + }, + { + name: "to config, ccm", + config: toConfig, + component: "cloud-controller-manager", + expectResult: map[string]FilterResult{ + "cloud-node": ControllerNonMigrated, + "route": ControllerMigrated, + "service": ControllerMigrated, + "cloud-node-lifecycle": ControllerMigrated, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + migrator := NewLeaderMigrator(tc.config, tc.component) + for name, expected := range tc.expectResult { + if result := migrator.FilterFunc(name); expected != result { + t.Errorf("controller %s, expect %v, got %v", name, expected, result) + } + } + }) + } +}