Merge pull request #57932 from atlassian/cancellable-leader-election

Automatic merge from submit-queue (batch tested with PRs 65256, 64236, 64919, 64879, 57932). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Cancellable leader election

**What this PR does / why we need it**:
Adds ability to cancel leader election. Useful in integration tests where the whole app is started and stopped in each test.

**Special notes for your reviewer**:
I used the `context` package - it is impossible/hard to achieve the same behaviour with just channels without spawning additional goroutines but it is trivial with `context`. See `acquire()` and `renew()` methods.

**Release note**:

```release-note
NONE
```
/kind enhancement
/sig api-machinery
This commit is contained in:
Kubernetes Submit Queue 2018-06-20 17:22:22 -07:00 committed by GitHub
commit 571b9beac5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 37 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package app package app
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
@ -135,7 +136,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
} }
} }
run := func(stop <-chan struct{}) { run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{ rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig, ClientConfig: c.Kubeconfig,
} }
@ -151,13 +152,13 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
clientBuilder = rootClientBuilder clientBuilder = rootClientBuilder
} }
if err := startControllers(c, rootClientBuilder, clientBuilder, stop, cloud); err != nil { if err := startControllers(c, rootClientBuilder, clientBuilder, ctx.Done(), cloud); err != nil {
glog.Fatalf("error running controllers: %v", err) glog.Fatalf("error running controllers: %v", err)
} }
} }
if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(nil) run(context.TODO())
panic("unreachable") panic("unreachable")
} }
@ -183,7 +184,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
} }
// Try and become the leader and start cloud controller manager loops // Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl, Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,

View File

@ -21,6 +21,7 @@ limitations under the License.
package app package app
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -144,7 +145,7 @@ func Run(c *config.CompletedConfig) error {
} }
} }
run := func(stop <-chan struct{}) { run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{ rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig, ClientConfig: c.Kubeconfig,
} }
@ -164,24 +165,24 @@ func Run(c *config.CompletedConfig) error {
} else { } else {
clientBuilder = rootClientBuilder clientBuilder = rootClientBuilder
} }
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop) controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil { if err != nil {
glog.Fatalf("error building controller context: %v", err) glog.Fatalf("error building controller context: %v", err)
} }
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil { if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode)); err != nil {
glog.Fatalf("error starting controllers: %v", err) glog.Fatalf("error starting controllers: %v", err)
} }
ctx.InformerFactory.Start(ctx.Stop) controllerContext.InformerFactory.Start(controllerContext.Stop)
close(ctx.InformersStarted) close(controllerContext.InformersStarted)
select {} select {}
} }
if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop) run(context.TODO())
panic("unreachable") panic("unreachable")
} }
@ -204,7 +205,7 @@ func Run(c *config.CompletedConfig) error {
glog.Fatalf("error creating lock: %v", err) glog.Fatalf("error creating lock: %v", err)
} }
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl, Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,

View File

@ -18,6 +18,7 @@ limitations under the License.
package app package app
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -181,11 +182,22 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced) controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)
// Prepare a reusable run function. // Prepare a reusable run function.
run := func(stopCh <-chan struct{}) { run := func(ctx context.Context) {
sched.Run() sched.Run()
<-stopCh <-ctx.Done()
} }
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, run via LeaderElector until done and exit. // If leader election is enabled, run via LeaderElector until done and exit.
if c.LeaderElection != nil { if c.LeaderElection != nil {
c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
@ -199,13 +211,13 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
return fmt.Errorf("couldn't create leader elector: %v", err) return fmt.Errorf("couldn't create leader elector: %v", err)
} }
leaderElector.Run() leaderElector.Run(ctx)
return fmt.Errorf("lost lease") return fmt.Errorf("lost lease")
} }
// Leader election is disabled, so run inline until done. // Leader election is disabled, so run inline until done.
run(stopCh) run(ctx)
return fmt.Errorf("finished without leader elect") return fmt.Errorf("finished without leader elect")
} }

View File

@ -49,6 +49,7 @@ limitations under the License.
package leaderelection package leaderelection
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"time" "time"
@ -119,7 +120,7 @@ type LeaderElectionConfig struct {
// * OnChallenge() // * OnChallenge()
type LeaderCallbacks struct { type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading // OnStartedLeading is called when a LeaderElector client starts leading
OnStartedLeading func(stop <-chan struct{}) OnStartedLeading func(context.Context)
// OnStoppedLeading is called when a LeaderElector client stops leading // OnStoppedLeading is called when a LeaderElector client stops leading
OnStoppedLeading func() OnStoppedLeading func()
// OnNewLeader is called when the client observes a leader that is // OnNewLeader is called when the client observes a leader that is
@ -145,26 +146,28 @@ type LeaderElector struct {
} }
// Run starts the leader election loop // Run starts the leader election loop
func (le *LeaderElector) Run() { func (le *LeaderElector) Run(ctx context.Context) {
defer func() { defer func() {
runtime.HandleCrash() runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading() le.config.Callbacks.OnStoppedLeading()
}() }()
le.acquire() if !le.acquire(ctx) {
stop := make(chan struct{}) return // ctx signalled done
go le.config.Callbacks.OnStartedLeading(stop) }
le.renew() ctx, cancel := context.WithCancel(ctx)
close(stop) defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
} }
// RunOrDie starts a client with the provided config or panics if the config // RunOrDie starts a client with the provided config or panics if the config
// fails to validate. // fails to validate.
func RunOrDie(lec LeaderElectionConfig) { func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec) le, err := NewLeaderElector(lec)
if err != nil { if err != nil {
panic(err) panic(err)
} }
le.Run() le.Run(ctx)
} }
// GetLeader returns the identity of the last observed leader or returns the empty string if // GetLeader returns the identity of the last observed leader or returns the empty string if
@ -178,13 +181,16 @@ func (le *LeaderElector) IsLeader() bool {
return le.observedRecord.HolderIdentity == le.config.Lock.Identity() return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
} }
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds. // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
func (le *LeaderElector) acquire() { // Returns false if ctx signals done.
stop := make(chan struct{}) func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe() desc := le.config.Lock.Describe()
glog.Infof("attempting to acquire leader lease %v...", desc) glog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() { wait.JitterUntil(func() {
succeeded := le.tryAcquireOrRenew() succeeded = le.tryAcquireOrRenew()
le.maybeReportTransition() le.maybeReportTransition()
if !succeeded { if !succeeded {
glog.V(4).Infof("failed to acquire lease %v", desc) glog.V(4).Infof("failed to acquire lease %v", desc)
@ -192,17 +198,21 @@ func (le *LeaderElector) acquire() {
} }
le.config.Lock.RecordEvent("became leader") le.config.Lock.RecordEvent("became leader")
glog.Infof("successfully acquired lease %v", desc) glog.Infof("successfully acquired lease %v", desc)
close(stop) cancel()
}, le.config.RetryPeriod, JitterFactor, true, stop) }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
} }
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails. // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew() { func (le *LeaderElector) renew(ctx context.Context) {
stop := make(chan struct{}) ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() { wait.Until(func() {
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(), nil return le.tryAcquireOrRenew(), nil
}) }, timeoutCtx.Done())
le.maybeReportTransition() le.maybeReportTransition()
desc := le.config.Lock.Describe() desc := le.config.Lock.Describe()
if err == nil { if err == nil {
@ -211,8 +221,8 @@ func (le *LeaderElector) renew() {
} }
le.config.Lock.RecordEvent("stopped leading") le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v: %v", desc, err) glog.Infof("failed to renew lease %v: %v", desc, err)
close(stop) cancel()
}, 0, stop) }, 0, ctx.Done())
} }
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,