diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 01c4bfe2ba0..bcce3684e13 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -157,7 +157,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { - run(nil) + run(wait.NeverStop) panic("unreachable") } @@ -183,7 +183,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } // Try and become the leader and start cloud controller manager loops - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 321d6e41ba9..1ae69c42154 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -204,7 +204,7 @@ func Run(c *config.CompletedConfig) error { glog.Fatalf("error creating lock: %v", err) } - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index ee776c89b6d..db7e8826059 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -199,7 +199,7 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error return fmt.Errorf("couldn't create leader elector: %v", err) } - leaderElector.Run() + leaderElector.Run(stopCh) return fmt.Errorf("lost lease") } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index aed55574a8f..cf56069d51f 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -51,6 +51,7 @@ package leaderelection import ( "fmt" "reflect" + "sync" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -145,26 +146,28 @@ type LeaderElector struct { } // Run starts the leader election loop -func (le *LeaderElector) Run() { +func (le *LeaderElector) Run(stop <-chan struct{}) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() - le.acquire() - stop := make(chan struct{}) - go le.config.Callbacks.OnStartedLeading(stop) - le.renew() - close(stop) + if !le.acquire(stop) { + return // stop signalled done + } + internalStop := make(chan struct{}) + defer close(internalStop) + go le.config.Callbacks.OnStartedLeading(internalStop) + le.renew(stop) } // RunOrDie starts a client with the provided config or panics if the config // fails to validate. -func RunOrDie(lec LeaderElectionConfig) { +func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } - le.Run() + le.Run(stop) } // GetLeader returns the identity of the last observed leader or returns the empty string if @@ -178,13 +181,23 @@ func (le *LeaderElector) IsLeader() bool { return le.observedRecord.HolderIdentity == le.config.Lock.Identity() } -// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds. -func (le *LeaderElector) acquire() { - stop := make(chan struct{}) +// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. +// Returns false if stop signals done. +func (le *LeaderElector) acquire(stop <-chan struct{}) bool { + tmpStop := make(chan struct{}) + once := sync.Once{} + go func() { + select { + case <-stop: + once.Do(func() { close(tmpStop) }) + case <-tmpStop: + } + }() + succeeded := false desc := le.config.Lock.Describe() glog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { - succeeded := le.tryAcquireOrRenew() + succeeded = le.tryAcquireOrRenew() le.maybeReportTransition() if !succeeded { glog.V(4).Infof("failed to acquire lease %v", desc) @@ -192,17 +205,41 @@ func (le *LeaderElector) acquire() { } le.config.Lock.RecordEvent("became leader") glog.Infof("successfully acquired lease %v", desc) - close(stop) - }, le.config.RetryPeriod, JitterFactor, true, stop) + once.Do(func() { close(tmpStop) }) + }, le.config.RetryPeriod, JitterFactor, true, tmpStop) + return succeeded } -// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails. -func (le *LeaderElector) renew() { - stop := make(chan struct{}) +// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. +func (le *LeaderElector) renew(stop <-chan struct{}) { + tmpStop := make(chan struct{}) + once := sync.Once{} + go func() { + select { + case <-stop: + once.Do(func() { close(tmpStop) }) + case <-tmpStop: + } + }() wait.Until(func() { - err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { + // PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod + t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline) + defer t.Stop() + internalStop := make(chan struct{}) + internalOnce := sync.Once{} + defer internalOnce.Do(func() { close(internalStop) }) + go func() { + select { + case <-tmpStop: + internalOnce.Do(func() { close(internalStop) }) + case <-t.C: + internalOnce.Do(func() { close(internalStop) }) + case <-internalStop: + } + }() + err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(), nil - }) + }, internalStop) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { @@ -211,8 +248,8 @@ func (le *LeaderElector) renew() { } le.config.Lock.RecordEvent("stopped leading") glog.Infof("failed to renew lease %v: %v", desc, err) - close(stop) - }, 0, stop) + once.Do(func() { close(tmpStop) }) + }, 0, tmpStop) } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,