From 09890b6c48da8e85237a5674d6256900f482b0a5 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 27 Nov 2018 21:51:21 -0500 Subject: [PATCH] leaderelection: Allow leader elected code to step down on a context cancel The current code simply exits without continuing to renew the lease, which means participants using a slower lease duration might have to wait multiple minutes before a new leader is elected. Allow an optional flag to be set on LeaderElectionConfig that will release the lease when the calling context is cancelled. Callers *must* ensure their lease guarded code has completed before the context is cancelled, or other processes may acquire the lease before this lease has released. Add an example command that demonstrates how cancellation could be done. As a convenience to users, make event recorder optional - not all users of the lock code will need a recorder. --- .../client-go/tools/leaderelection/BUILD | 1 + .../tools/leaderelection/example/BUILD | 38 ++++ .../tools/leaderelection/example/main.go | 122 ++++++++++++ .../tools/leaderelection/leaderelection.go | 32 ++- .../leaderelection/leaderelection_test.go | 184 +++++++++++------- .../tools/leaderelection/resourcelock/BUILD | 2 +- .../resourcelock/configmaplock.go | 3 + .../resourcelock/endpointslock.go | 3 + .../leaderelection/resourcelock/interface.go | 19 +- 9 files changed, 325 insertions(+), 79 deletions(-) create mode 100644 staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD create mode 100644 staging/src/k8s.io/client-go/tools/leaderelection/example/main.go diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD index bed0f89fee7..51444677829 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD @@ -56,6 +56,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/client-go/tools/leaderelection/example:all-srcs", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:all-srcs", ], tags = ["automanaged"], diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD new file mode 100644 index 00000000000..cb1cfbca0fc --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection/example", + importpath = "k8s.io/client-go/tools/leaderelection/example", + visibility = ["//visibility:private"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_binary( + name = "example", + embed = [":go_default_library"], + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go new file mode 100644 index 00000000000..91511e5b16d --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog" +) + +// main demonstrates a leader elected process that will step down if interrupted. +func main() { + klog.InitFlags(nil) + flag.Parse() + args := flag.Args() + if len(args) != 3 { + log.Fatalf("requires three arguments: ID NAMESPACE CONFIG_MAP_NAME (%d)", len(args)) + } + + // leader election uses the Kubernetes API by writing to a ConfigMap or Endpoints + // object. Conflicting writes are detected and each client handles those actions + // independently. + var config *rest.Config + var err error + if kubeconfig := os.Getenv("KUBECONFIG"); len(kubeconfig) > 0 { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + config, err = rest.InClusterConfig() + } + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + // we use the ConfigMap lock type since edits to ConfigMaps are less common + // and fewer objects in the cluster watch "all ConfigMaps" (unlike the older + // Endpoints lock type, where quite a few system agents like the kube-proxy + // and ingress controllers must watch endpoints). + id := args[0] + lock := &resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: args[1], + Name: args[2], + }, + Client: kubernetes.NewForConfigOrDie(config).CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + } + + // use a Go context so we can tell the leaderelection code when we + // want to step down + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // listen for interrupts or the Linux SIGTERM signal and cancel + // our context, which the leader election code will observe and + // step down + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + <-ch + log.Printf("Received termination, signaling shutdown") + cancel() + }() + + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // we're notified when we start - this is where you would + // usually put your code + log.Printf("%s: leading", id) + }, + OnStoppedLeading: func() { + // we can do cleanup here, or after the RunOrDie method + // returns + log.Printf("%s: lost", id) + }, + }, + }) + + // we no longer hold the lease, so perform any cleanup and then + // exit + log.Printf("%s: done", id) +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 2096a5996bf..d3eb79b8e5c 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -118,6 +118,13 @@ type LeaderElectionConfig struct { // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor + // ReleaseOnCancel should be set true if the lock should be released + // when the run context is cancelled. If you set this to true, you must + // ensure all code guarded by this lease has successfully completed + // prior to cancelling the context, or you may have two processes + // simultaneously acting on the critical path. + ReleaseOnCancel bool + // Name is the name of the resource lock for debugging Name string } @@ -249,6 +256,28 @@ func (le *LeaderElector) renew(ctx context.Context) { klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) + + // if we hold the lease, give it up + if le.config.ReleaseOnCancel { + le.release() + } +} + +// release attempts to release the leader lease if we have acquired it. +func (le *LeaderElector) release() bool { + if !le.IsLeader() { + return true + } + leaderElectionRecord := rl.LeaderElectionRecord{ + LeaderTransitions: le.observedRecord.LeaderTransitions, + } + if err := le.config.Lock.Update(leaderElectionRecord); err != nil { + klog.Errorf("Failed to release lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, @@ -284,7 +313,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { le.observedRecord = *oldLeaderElectionRecord le.observedTime = le.clock.Now() } - if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && + le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 842aebdab2a..c286b257909 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -55,6 +55,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { past := time.Now().Add(-1000 * time.Hour) tests := []struct { + name string observedRecord rl.LeaderElectionRecord observedTime time.Time reactors []struct { @@ -66,8 +67,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader bool outHolder string }{ - // acquire from no object { + name: "acquire from no object", reactors: []struct { verb string reaction core.ReactionFunc @@ -88,8 +89,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: true, outHolder: "baz", }, - // acquire from unled object { + name: "acquire from unled object", reactors: []struct { verb string reaction core.ReactionFunc @@ -116,8 +117,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // acquire from led, unacked object { + name: "acquire from led, unacked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -149,8 +150,40 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // don't acquire from led, acked object { + name: "acquire from empty led, acked object", + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":""}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "don't acquire from led, acked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -174,8 +207,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: false, outHolder: "bing", }, - // renew already acquired object { + name: "renew already acquired object", reactors: []struct { verb string reaction core.ReactionFunc @@ -208,83 +241,86 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, } - for i, test := range tests { - // OnNewLeader is called async so we have to wait for it. - var wg sync.WaitGroup - wg.Add(1) - var reportedLeader string - var lock rl.Interface + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface - objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} - resourceLockConfig := rl.ResourceLockConfig{ - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, - } - c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} - for _, reactor := range test.reactors { - c.AddReactor(reactor.verb, objectType, reactor.reaction) - } - c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) - return true, nil, fmt.Errorf("unreachable action") - }) - - switch objectType { - case "endpoints": - lock = &rl.EndpointsLock{ - EndpointsMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, } - case "configmaps": - lock = &rl.ConfigMapLock{ - ConfigMapMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) } - } + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) - lec := LeaderElectionConfig{ - Lock: lock, - LeaseDuration: 10 * time.Second, - Callbacks: LeaderCallbacks{ - OnNewLeader: func(l string) { - defer wg.Done() - reportedLeader = l + switch objectType { + case "endpoints": + lock = &rl.EndpointsLock{ + EndpointsMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + case "configmaps": + lock = &rl.ConfigMapLock{ + ConfigMapMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + } + + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, }, - }, - } - le := &LeaderElector{ - config: lec, - observedRecord: test.observedRecord, - observedTime: test.observedTime, - clock: clock.RealClock{}, - } + } + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedTime: test.observedTime, + clock: clock.RealClock{}, + } - if test.expectSuccess != le.tryAcquireOrRenew() { - t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeeded=%v]", i, !test.expectSuccess) - } + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) + } - le.observedRecord.AcquireTime = metav1.Time{} - le.observedRecord.RenewTime = metav1.Time{} - if le.observedRecord.HolderIdentity != test.outHolder { - t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity) - } - if len(test.reactors) != len(c.Actions()) { - t.Errorf("[%v]wrong number of api interactions", i) - } - if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { - t.Errorf("[%v]leader should have transitioned but did not", i) - } - if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { - t.Errorf("[%v]leader should not have transitioned but did", i) - } + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } - le.maybeReportTransition() - wg.Wait() - if reportedLeader != test.outHolder { - t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) - } + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + }) } } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD index 07c797c23f8..367d454d49c 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD @@ -17,8 +17,8 @@ go_library( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go index c12daad022f..785356894f1 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go @@ -93,6 +93,9 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (cml *ConfigMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go index 6f7dcfb0cc9..bfe5e8b1bb3 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go @@ -88,6 +88,9 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (el *EndpointsLock) RecordEvent(s string) { + if el.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go index 676fd1d7dbc..0bf8e9cd6e6 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -20,8 +20,8 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" ) const ( @@ -35,6 +35,11 @@ const ( // with a random string (e.g. UUID) with only slight modification of this code. // TODO(mikedanese): this should potentially be versioned type LeaderElectionRecord struct { + // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and + // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not + // attempt to acquire leases with empty identities and will wait for the full lease + // interval to expire before attempting to reacquire. This value is set to empty when + // a client voluntarily steps down. HolderIdentity string `json:"holderIdentity"` LeaseDurationSeconds int `json:"leaseDurationSeconds"` AcquireTime metav1.Time `json:"acquireTime"` @@ -42,11 +47,19 @@ type LeaderElectionRecord struct { LeaderTransitions int `json:"leaderTransitions"` } +// EventRecorder records a change in the ResourceLock. +type EventRecorder interface { + Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) +} + // ResourceLockConfig common data that exists across different // resource locks type ResourceLockConfig struct { - Identity string - EventRecorder record.EventRecorder + // Identity is the unique string identifying a lease holder across + // all participants in an election. + Identity string + // EventRecorder is optional. + EventRecorder EventRecorder } // Interface offers a common interface for locking on arbitrary