Cancellable leader election with context

Kubernetes-commit: dc32a341c01ec122f54604e9fdbdf9b77d2e19e3
This commit is contained in:
Mikhail Mazurskiy 2018-02-12 21:02:56 +11:00 committed by Kubernetes Publisher
parent 79cc4033c7
commit ad39df114e

View File

@ -49,9 +49,9 @@ limitations under the License.
package leaderelection package leaderelection
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
@ -120,7 +120,7 @@ type LeaderElectionConfig struct {
// * OnChallenge() // * OnChallenge()
type LeaderCallbacks struct { type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading // OnStartedLeading is called when a LeaderElector client starts leading
OnStartedLeading func(stop <-chan struct{}) OnStartedLeading func(context.Context)
// OnStoppedLeading is called when a LeaderElector client stops leading // OnStoppedLeading is called when a LeaderElector client stops leading
OnStoppedLeading func() OnStoppedLeading func()
// OnNewLeader is called when the client observes a leader that is // OnNewLeader is called when the client observes a leader that is
@ -146,28 +146,28 @@ type LeaderElector struct {
} }
// Run starts the leader election loop // Run starts the leader election loop
func (le *LeaderElector) Run(stop <-chan struct{}) { func (le *LeaderElector) Run(ctx context.Context) {
defer func() { defer func() {
runtime.HandleCrash() runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading() le.config.Callbacks.OnStoppedLeading()
}() }()
if !le.acquire(stop) { if !le.acquire(ctx) {
return // stop signalled done return // ctx signalled done
} }
internalStop := make(chan struct{}) ctx, cancel := context.WithCancel(ctx)
defer close(internalStop) defer cancel()
go le.config.Callbacks.OnStartedLeading(internalStop) go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(stop) le.renew(ctx)
} }
// RunOrDie starts a client with the provided config or panics if the config // RunOrDie starts a client with the provided config or panics if the config
// fails to validate. // fails to validate.
func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) { func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec) le, err := NewLeaderElector(lec)
if err != nil { if err != nil {
panic(err) panic(err)
} }
le.Run(stop) le.Run(ctx)
} }
// GetLeader returns the identity of the last observed leader or returns the empty string if // GetLeader returns the identity of the last observed leader or returns the empty string if
@ -182,17 +182,10 @@ func (le *LeaderElector) IsLeader() bool {
} }
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if stop signals done. // Returns false if ctx signals done.
func (le *LeaderElector) acquire(stop <-chan struct{}) bool { func (le *LeaderElector) acquire(ctx context.Context) bool {
tmpStop := make(chan struct{}) ctx, cancel := context.WithCancel(ctx)
once := sync.Once{} defer cancel()
go func() {
select {
case <-stop:
once.Do(func() { close(tmpStop) })
case <-tmpStop:
}
}()
succeeded := false succeeded := false
desc := le.config.Lock.Describe() desc := le.config.Lock.Describe()
glog.Infof("attempting to acquire leader lease %v...", desc) glog.Infof("attempting to acquire leader lease %v...", desc)
@ -205,41 +198,22 @@ func (le *LeaderElector) acquire(stop <-chan struct{}) bool {
} }
le.config.Lock.RecordEvent("became leader") le.config.Lock.RecordEvent("became leader")
glog.Infof("successfully acquired lease %v", desc) glog.Infof("successfully acquired lease %v", desc)
once.Do(func() { close(tmpStop) }) cancel()
}, le.config.RetryPeriod, JitterFactor, true, tmpStop) }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded return succeeded
} }
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(stop <-chan struct{}) { func (le *LeaderElector) renew(ctx context.Context) {
tmpStop := make(chan struct{}) ctx, cancel := context.WithCancel(ctx)
once := sync.Once{} defer cancel()
go func() {
select {
case <-stop:
once.Do(func() { close(tmpStop) })
case <-tmpStop:
}
}()
wait.Until(func() { wait.Until(func() {
// PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod // PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod
t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline) timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RetryPeriod+le.config.RenewDeadline)
defer t.Stop() defer timeoutCancel()
internalStop := make(chan struct{})
internalOnce := sync.Once{}
defer internalOnce.Do(func() { close(internalStop) })
go func() {
select {
case <-tmpStop:
internalOnce.Do(func() { close(internalStop) })
case <-t.C:
internalOnce.Do(func() { close(internalStop) })
case <-internalStop:
}
}()
err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) { err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(), nil return le.tryAcquireOrRenew(), nil
}, internalStop) }, timeoutCtx.Done())
le.maybeReportTransition() le.maybeReportTransition()
desc := le.config.Lock.Describe() desc := le.config.Lock.Describe()
if err == nil { if err == nil {
@ -248,8 +222,8 @@ func (le *LeaderElector) renew(stop <-chan struct{}) {
} }
le.config.Lock.RecordEvent("stopped leading") le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v: %v", desc, err) glog.Infof("failed to renew lease %v: %v", desc, err)
once.Do(func() { close(tmpStop) }) cancel()
}, 0, tmpStop) }, 0, ctx.Done())
} }
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,