diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index cf56069d..63c29189 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -49,9 +49,9 @@ limitations under the License. package leaderelection import ( + "context" "fmt" "reflect" - "sync" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -120,7 +120,7 @@ type LeaderElectionConfig struct { // * OnChallenge() type LeaderCallbacks struct { // OnStartedLeading is called when a LeaderElector client starts leading - OnStartedLeading func(stop <-chan struct{}) + OnStartedLeading func(context.Context) // OnStoppedLeading is called when a LeaderElector client stops leading OnStoppedLeading func() // OnNewLeader is called when the client observes a leader that is @@ -146,28 +146,28 @@ type LeaderElector struct { } // Run starts the leader election loop -func (le *LeaderElector) Run(stop <-chan struct{}) { +func (le *LeaderElector) Run(ctx context.Context) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() - if !le.acquire(stop) { - return // stop signalled done + if !le.acquire(ctx) { + return // ctx signalled done } - internalStop := make(chan struct{}) - defer close(internalStop) - go le.config.Callbacks.OnStartedLeading(internalStop) - le.renew(stop) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go le.config.Callbacks.OnStartedLeading(ctx) + le.renew(ctx) } // RunOrDie starts a client with the provided config or panics if the config // fails to validate. -func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) { +func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } - le.Run(stop) + le.Run(ctx) } // GetLeader returns the identity of the last observed leader or returns the empty string if @@ -182,17 +182,10 @@ func (le *LeaderElector) IsLeader() bool { } // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. -// Returns false if stop signals done. -func (le *LeaderElector) acquire(stop <-chan struct{}) bool { - tmpStop := make(chan struct{}) - once := sync.Once{} - go func() { - select { - case <-stop: - once.Do(func() { close(tmpStop) }) - case <-tmpStop: - } - }() +// Returns false if ctx signals done. +func (le *LeaderElector) acquire(ctx context.Context) bool { + ctx, cancel := context.WithCancel(ctx) + defer cancel() succeeded := false desc := le.config.Lock.Describe() glog.Infof("attempting to acquire leader lease %v...", desc) @@ -205,41 +198,22 @@ func (le *LeaderElector) acquire(stop <-chan struct{}) bool { } le.config.Lock.RecordEvent("became leader") glog.Infof("successfully acquired lease %v", desc) - once.Do(func() { close(tmpStop) }) - }, le.config.RetryPeriod, JitterFactor, true, tmpStop) + cancel() + }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded } // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. -func (le *LeaderElector) renew(stop <-chan struct{}) { - tmpStop := make(chan struct{}) - once := sync.Once{} - go func() { - select { - case <-stop: - once.Do(func() { close(tmpStop) }) - case <-tmpStop: - } - }() +func (le *LeaderElector) renew(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() wait.Until(func() { // PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod - t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline) - defer t.Stop() - internalStop := make(chan struct{}) - internalOnce := sync.Once{} - defer internalOnce.Do(func() { close(internalStop) }) - go func() { - select { - case <-tmpStop: - internalOnce.Do(func() { close(internalStop) }) - case <-t.C: - internalOnce.Do(func() { close(internalStop) }) - case <-internalStop: - } - }() + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RetryPeriod+le.config.RenewDeadline) + defer timeoutCancel() err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(), nil - }, internalStop) + }, timeoutCtx.Done()) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { @@ -248,8 +222,8 @@ func (le *LeaderElector) renew(stop <-chan struct{}) { } le.config.Lock.RecordEvent("stopped leading") glog.Infof("failed to renew lease %v: %v", desc, err) - once.Do(func() { close(tmpStop) }) - }, 0, tmpStop) + cancel() + }, 0, ctx.Done()) } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,