From 1d99fff1acb1503755b94d4c72e6dedd35c2d249 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Mon, 12 Feb 2018 20:48:53 +1100 Subject: [PATCH 1/5] Cancellable leader election with channels --- .../app/controllermanager.go | 4 +- .../app/controllermanager.go | 2 +- cmd/kube-scheduler/app/server.go | 2 +- .../tools/leaderelection/leaderelection.go | 79 ++++++++++++++----- 4 files changed, 62 insertions(+), 25 deletions(-) diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 01c4bfe2ba0..bcce3684e13 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -157,7 +157,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { - run(nil) + run(wait.NeverStop) panic("unreachable") } @@ -183,7 +183,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } // Try and become the leader and start cloud controller manager loops - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 321d6e41ba9..1ae69c42154 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -204,7 +204,7 @@ func Run(c *config.CompletedConfig) error { glog.Fatalf("error creating lock: %v", err) } - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index ee776c89b6d..db7e8826059 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -199,7 +199,7 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error return fmt.Errorf("couldn't create leader elector: %v", err) } - leaderElector.Run() + leaderElector.Run(stopCh) return fmt.Errorf("lost lease") } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index aed55574a8f..cf56069d51f 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -51,6 +51,7 @@ package leaderelection import ( "fmt" "reflect" + "sync" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -145,26 +146,28 @@ type LeaderElector struct { } // Run starts the leader election loop -func (le *LeaderElector) Run() { +func (le *LeaderElector) Run(stop <-chan struct{}) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() - le.acquire() - stop := make(chan struct{}) - go le.config.Callbacks.OnStartedLeading(stop) - le.renew() - close(stop) + if !le.acquire(stop) { + return // stop signalled done + } + internalStop := make(chan struct{}) + defer close(internalStop) + go le.config.Callbacks.OnStartedLeading(internalStop) + le.renew(stop) } // RunOrDie starts a client with the provided config or panics if the config // fails to validate. -func RunOrDie(lec LeaderElectionConfig) { +func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } - le.Run() + le.Run(stop) } // GetLeader returns the identity of the last observed leader or returns the empty string if @@ -178,13 +181,23 @@ func (le *LeaderElector) IsLeader() bool { return le.observedRecord.HolderIdentity == le.config.Lock.Identity() } -// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds. -func (le *LeaderElector) acquire() { - stop := make(chan struct{}) +// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. +// Returns false if stop signals done. +func (le *LeaderElector) acquire(stop <-chan struct{}) bool { + tmpStop := make(chan struct{}) + once := sync.Once{} + go func() { + select { + case <-stop: + once.Do(func() { close(tmpStop) }) + case <-tmpStop: + } + }() + succeeded := false desc := le.config.Lock.Describe() glog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { - succeeded := le.tryAcquireOrRenew() + succeeded = le.tryAcquireOrRenew() le.maybeReportTransition() if !succeeded { glog.V(4).Infof("failed to acquire lease %v", desc) @@ -192,17 +205,41 @@ func (le *LeaderElector) acquire() { } le.config.Lock.RecordEvent("became leader") glog.Infof("successfully acquired lease %v", desc) - close(stop) - }, le.config.RetryPeriod, JitterFactor, true, stop) + once.Do(func() { close(tmpStop) }) + }, le.config.RetryPeriod, JitterFactor, true, tmpStop) + return succeeded } -// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails. -func (le *LeaderElector) renew() { - stop := make(chan struct{}) +// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. +func (le *LeaderElector) renew(stop <-chan struct{}) { + tmpStop := make(chan struct{}) + once := sync.Once{} + go func() { + select { + case <-stop: + once.Do(func() { close(tmpStop) }) + case <-tmpStop: + } + }() wait.Until(func() { - err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { + // PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod + t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline) + defer t.Stop() + internalStop := make(chan struct{}) + internalOnce := sync.Once{} + defer internalOnce.Do(func() { close(internalStop) }) + go func() { + select { + case <-tmpStop: + internalOnce.Do(func() { close(internalStop) }) + case <-t.C: + internalOnce.Do(func() { close(internalStop) }) + case <-internalStop: + } + }() + err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(), nil - }) + }, internalStop) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { @@ -211,8 +248,8 @@ func (le *LeaderElector) renew() { } le.config.Lock.RecordEvent("stopped leading") glog.Infof("failed to renew lease %v: %v", desc, err) - close(stop) - }, 0, stop) + once.Do(func() { close(tmpStop) }) + }, 0, tmpStop) } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, From dc32a341c01ec122f54604e9fdbdf9b77d2e19e3 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Mon, 12 Feb 2018 21:02:56 +1100 Subject: [PATCH 2/5] Cancellable leader election with context --- .../app/controllermanager.go | 12 ++- .../app/controllermanager.go | 12 ++- cmd/kube-scheduler/app/server.go | 7 +- .../tools/leaderelection/leaderelection.go | 76 ++++++------------- 4 files changed, 46 insertions(+), 61 deletions(-) diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index bcce3684e13..f0348b42278 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "context" "fmt" "math/rand" "net" @@ -135,7 +136,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } } - run := func(stop <-chan struct{}) { + run := func(ctx context.Context) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, } @@ -151,13 +152,16 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { clientBuilder = rootClientBuilder } - if err := startControllers(c, rootClientBuilder, clientBuilder, stop, cloud); err != nil { + if err := startControllers(c, rootClientBuilder, clientBuilder, ctx.Done(), cloud); err != nil { glog.Fatalf("error running controllers: %v", err) } } + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { - run(wait.NeverStop) + run(runCtx) panic("unreachable") } @@ -183,7 +187,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } // Try and become the leader and start cloud controller manager loops - leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(runCtx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 1ae69c42154..b465104571a 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -21,6 +21,7 @@ limitations under the License. package app import ( + "context" "fmt" "io/ioutil" "math/rand" @@ -144,7 +145,7 @@ func Run(c *config.CompletedConfig) error { } } - run := func(stop <-chan struct{}) { + run := func(runCtx context.Context) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, } @@ -164,7 +165,7 @@ func Run(c *config.CompletedConfig) error { } else { clientBuilder = rootClientBuilder } - ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop) + ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, runCtx.Done()) if err != nil { glog.Fatalf("error building controller context: %v", err) } @@ -180,8 +181,11 @@ func Run(c *config.CompletedConfig) error { select {} } + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { - run(wait.NeverStop) + run(runCtx) panic("unreachable") } @@ -204,7 +208,7 @@ func Run(c *config.CompletedConfig) error { glog.Fatalf("error creating lock: %v", err) } - leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(runCtx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index db7e8826059..c7f34a6ed72 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -18,6 +18,7 @@ limitations under the License. package app import ( + "context" "fmt" "io" "io/ioutil" @@ -189,7 +190,9 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error // If leader election is enabled, run via LeaderElector until done and exit. if c.LeaderElection != nil { c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ - OnStartedLeading: run, + OnStartedLeading: func(ctx context.Context) { + run(ctx.Done()) + }, OnStoppedLeading: func() { utilruntime.HandleError(fmt.Errorf("lost master")) }, @@ -199,7 +202,7 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error return fmt.Errorf("couldn't create leader elector: %v", err) } - leaderElector.Run(stopCh) + leaderElector.Run(context.TODO()) return fmt.Errorf("lost lease") } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index cf56069d51f..63c29189fa9 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -49,9 +49,9 @@ limitations under the License. package leaderelection import ( + "context" "fmt" "reflect" - "sync" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -120,7 +120,7 @@ type LeaderElectionConfig struct { // * OnChallenge() type LeaderCallbacks struct { // OnStartedLeading is called when a LeaderElector client starts leading - OnStartedLeading func(stop <-chan struct{}) + OnStartedLeading func(context.Context) // OnStoppedLeading is called when a LeaderElector client stops leading OnStoppedLeading func() // OnNewLeader is called when the client observes a leader that is @@ -146,28 +146,28 @@ type LeaderElector struct { } // Run starts the leader election loop -func (le *LeaderElector) Run(stop <-chan struct{}) { +func (le *LeaderElector) Run(ctx context.Context) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() - if !le.acquire(stop) { - return // stop signalled done + if !le.acquire(ctx) { + return // ctx signalled done } - internalStop := make(chan struct{}) - defer close(internalStop) - go le.config.Callbacks.OnStartedLeading(internalStop) - le.renew(stop) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go le.config.Callbacks.OnStartedLeading(ctx) + le.renew(ctx) } // RunOrDie starts a client with the provided config or panics if the config // fails to validate. -func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) { +func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } - le.Run(stop) + le.Run(ctx) } // GetLeader returns the identity of the last observed leader or returns the empty string if @@ -182,17 +182,10 @@ func (le *LeaderElector) IsLeader() bool { } // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. -// Returns false if stop signals done. -func (le *LeaderElector) acquire(stop <-chan struct{}) bool { - tmpStop := make(chan struct{}) - once := sync.Once{} - go func() { - select { - case <-stop: - once.Do(func() { close(tmpStop) }) - case <-tmpStop: - } - }() +// Returns false if ctx signals done. +func (le *LeaderElector) acquire(ctx context.Context) bool { + ctx, cancel := context.WithCancel(ctx) + defer cancel() succeeded := false desc := le.config.Lock.Describe() glog.Infof("attempting to acquire leader lease %v...", desc) @@ -205,41 +198,22 @@ func (le *LeaderElector) acquire(stop <-chan struct{}) bool { } le.config.Lock.RecordEvent("became leader") glog.Infof("successfully acquired lease %v", desc) - once.Do(func() { close(tmpStop) }) - }, le.config.RetryPeriod, JitterFactor, true, tmpStop) + cancel() + }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded } // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. -func (le *LeaderElector) renew(stop <-chan struct{}) { - tmpStop := make(chan struct{}) - once := sync.Once{} - go func() { - select { - case <-stop: - once.Do(func() { close(tmpStop) }) - case <-tmpStop: - } - }() +func (le *LeaderElector) renew(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() wait.Until(func() { // PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod - t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline) - defer t.Stop() - internalStop := make(chan struct{}) - internalOnce := sync.Once{} - defer internalOnce.Do(func() { close(internalStop) }) - go func() { - select { - case <-tmpStop: - internalOnce.Do(func() { close(internalStop) }) - case <-t.C: - internalOnce.Do(func() { close(internalStop) }) - case <-internalStop: - } - }() + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RetryPeriod+le.config.RenewDeadline) + defer timeoutCancel() err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(), nil - }, internalStop) + }, timeoutCtx.Done()) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { @@ -248,8 +222,8 @@ func (le *LeaderElector) renew(stop <-chan struct{}) { } le.config.Lock.RecordEvent("stopped leading") glog.Infof("failed to renew lease %v: %v", desc, err) - once.Do(func() { close(tmpStop) }) - }, 0, tmpStop) + cancel() + }, 0, ctx.Done()) } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, From 3252beb02be6791dde5bba181b4e5db65d148d53 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Wed, 14 Feb 2018 22:19:11 +1100 Subject: [PATCH 3/5] Propagate signal from stop to context --- cmd/kube-scheduler/app/server.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index c7f34a6ed72..f815b479168 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -182,17 +182,26 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced) // Prepare a reusable run function. - run := func(stopCh <-chan struct{}) { + run := func(ctx context.Context) { sched.Run() - <-stopCh + <-ctx.Done() } + runCtx, cancel := context.WithCancel(context.TODO()) // once Run() accepts a context, it should be used here + defer cancel() + + go func() { + select { + case <-stopCh: + cancel() + case <-runCtx.Done(): + } + }() + // If leader election is enabled, run via LeaderElector until done and exit. if c.LeaderElection != nil { c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - run(ctx.Done()) - }, + OnStartedLeading: run, OnStoppedLeading: func() { utilruntime.HandleError(fmt.Errorf("lost master")) }, @@ -202,13 +211,13 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error return fmt.Errorf("couldn't create leader elector: %v", err) } - leaderElector.Run(context.TODO()) + leaderElector.Run(runCtx) return fmt.Errorf("lost lease") } // Leader election is disabled, so run inline until done. - run(stopCh) + run(runCtx) return fmt.Errorf("finished without leader elect") } From 102090d1f12f9b00571d440470ba040e0632fbe0 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Thu, 7 Jun 2018 14:33:03 +1000 Subject: [PATCH 4/5] Use context.TODO() to be explicit that cancellation is not implemented --- cmd/cloud-controller-manager/app/controllermanager.go | 7 ++----- cmd/kube-controller-manager/app/controllermanager.go | 7 ++----- cmd/kube-scheduler/app/server.go | 8 ++++---- .../client-go/tools/leaderelection/leaderelection.go | 5 ++--- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index f0348b42278..0b73740cbea 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -157,11 +157,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } } - runCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { - run(runCtx) + run(context.TODO()) panic("unreachable") } @@ -187,7 +184,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error { } // Try and become the leader and start cloud controller manager loops - leaderelection.RunOrDie(runCtx, leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index b465104571a..f3d45b2d13c 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -181,11 +181,8 @@ func Run(c *config.CompletedConfig) error { select {} } - runCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { - run(runCtx) + run(context.TODO()) panic("unreachable") } @@ -208,7 +205,7 @@ func Run(c *config.CompletedConfig) error { glog.Fatalf("error creating lock: %v", err) } - leaderelection.RunOrDie(runCtx, leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration, diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index f815b479168..6b8edc86cc8 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -187,14 +187,14 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error <-ctx.Done() } - runCtx, cancel := context.WithCancel(context.TODO()) // once Run() accepts a context, it should be used here + ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here defer cancel() go func() { select { case <-stopCh: cancel() - case <-runCtx.Done(): + case <-ctx.Done(): } }() @@ -211,13 +211,13 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error return fmt.Errorf("couldn't create leader elector: %v", err) } - leaderElector.Run(runCtx) + leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so run inline until done. - run(runCtx) + run(ctx) return fmt.Errorf("finished without leader elect") } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 63c29189fa9..233dae53f2e 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -208,10 +208,9 @@ func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() wait.Until(func() { - // PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RetryPeriod+le.config.RenewDeadline) + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() - err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) { + err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(), nil }, timeoutCtx.Done()) le.maybeReportTransition() From e458cfe02ccb3b816f86d8025c03da67eae3ff0d Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Sat, 9 Jun 2018 13:06:23 +1000 Subject: [PATCH 5/5] Rename context --- cmd/kube-controller-manager/app/controllermanager.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f3d45b2d13c..1e270069595 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -145,7 +145,7 @@ func Run(c *config.CompletedConfig) error { } } - run := func(runCtx context.Context) { + run := func(ctx context.Context) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, } @@ -165,18 +165,18 @@ func Run(c *config.CompletedConfig) error { } else { clientBuilder = rootClientBuilder } - ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, runCtx.Done()) + controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { glog.Fatalf("error building controller context: %v", err) } saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController - if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil { + if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode)); err != nil { glog.Fatalf("error starting controllers: %v", err) } - ctx.InformerFactory.Start(ctx.Stop) - close(ctx.InformersStarted) + controllerContext.InformerFactory.Start(controllerContext.Stop) + close(controllerContext.InformersStarted) select {} }