Cancellable leader election with channels

Kubernetes-commit: 1d99fff1acb1503755b94d4c72e6dedd35c2d249
This commit is contained in:
Mikhail Mazurskiy 2018-02-12 20:48:53 +11:00 committed by Kubernetes Publisher
parent c4be56f535
commit 79cc4033c7

View File

@ -51,6 +51,7 @@ package leaderelection
import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@ -145,26 +146,28 @@ type LeaderElector struct {
}
// Run starts the leader election loop
func (le *LeaderElector) Run() {
func (le *LeaderElector) Run(stop <-chan struct{}) {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
le.acquire()
stop := make(chan struct{})
go le.config.Callbacks.OnStartedLeading(stop)
le.renew()
close(stop)
if !le.acquire(stop) {
return // stop signalled done
}
internalStop := make(chan struct{})
defer close(internalStop)
go le.config.Callbacks.OnStartedLeading(internalStop)
le.renew(stop)
}
// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(lec LeaderElectionConfig) {
func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
le.Run()
le.Run(stop)
}
// GetLeader returns the identity of the last observed leader or returns the empty string if
@ -178,13 +181,23 @@ func (le *LeaderElector) IsLeader() bool {
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
}
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds.
func (le *LeaderElector) acquire() {
stop := make(chan struct{})
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if stop signals done.
func (le *LeaderElector) acquire(stop <-chan struct{}) bool {
tmpStop := make(chan struct{})
once := sync.Once{}
go func() {
select {
case <-stop:
once.Do(func() { close(tmpStop) })
case <-tmpStop:
}
}()
succeeded := false
desc := le.config.Lock.Describe()
glog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded := le.tryAcquireOrRenew()
succeeded = le.tryAcquireOrRenew()
le.maybeReportTransition()
if !succeeded {
glog.V(4).Infof("failed to acquire lease %v", desc)
@ -192,17 +205,41 @@ func (le *LeaderElector) acquire() {
}
le.config.Lock.RecordEvent("became leader")
glog.Infof("successfully acquired lease %v", desc)
close(stop)
}, le.config.RetryPeriod, JitterFactor, true, stop)
once.Do(func() { close(tmpStop) })
}, le.config.RetryPeriod, JitterFactor, true, tmpStop)
return succeeded
}
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails.
func (le *LeaderElector) renew() {
stop := make(chan struct{})
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(stop <-chan struct{}) {
tmpStop := make(chan struct{})
once := sync.Once{}
go func() {
select {
case <-stop:
once.Do(func() { close(tmpStop) })
case <-tmpStop:
}
}()
wait.Until(func() {
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
// PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod
t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline)
defer t.Stop()
internalStop := make(chan struct{})
internalOnce := sync.Once{}
defer internalOnce.Do(func() { close(internalStop) })
go func() {
select {
case <-tmpStop:
internalOnce.Do(func() { close(internalStop) })
case <-t.C:
internalOnce.Do(func() { close(internalStop) })
case <-internalStop:
}
}()
err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
})
}, internalStop)
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
@ -211,8 +248,8 @@ func (le *LeaderElector) renew() {
}
le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v: %v", desc, err)
close(stop)
}, 0, stop)
once.Do(func() { close(tmpStop) })
}, 0, tmpStop)
}
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,