diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD index bed0f89fee7..51444677829 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD @@ -56,6 +56,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/client-go/tools/leaderelection/example:all-srcs", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:all-srcs", ], tags = ["automanaged"], diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD new file mode 100644 index 00000000000..cb1cfbca0fc --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection/example", + importpath = "k8s.io/client-go/tools/leaderelection/example", + visibility = ["//visibility:private"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_binary( + name = "example", + embed = [":go_default_library"], + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go new file mode 100644 index 00000000000..91511e5b16d --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog" +) + +// main demonstrates a leader elected process that will step down if interrupted. +func main() { + klog.InitFlags(nil) + flag.Parse() + args := flag.Args() + if len(args) != 3 { + log.Fatalf("requires three arguments: ID NAMESPACE CONFIG_MAP_NAME (%d)", len(args)) + } + + // leader election uses the Kubernetes API by writing to a ConfigMap or Endpoints + // object. Conflicting writes are detected and each client handles those actions + // independently. + var config *rest.Config + var err error + if kubeconfig := os.Getenv("KUBECONFIG"); len(kubeconfig) > 0 { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + config, err = rest.InClusterConfig() + } + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + // we use the ConfigMap lock type since edits to ConfigMaps are less common + // and fewer objects in the cluster watch "all ConfigMaps" (unlike the older + // Endpoints lock type, where quite a few system agents like the kube-proxy + // and ingress controllers must watch endpoints). + id := args[0] + lock := &resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: args[1], + Name: args[2], + }, + Client: kubernetes.NewForConfigOrDie(config).CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + } + + // use a Go context so we can tell the leaderelection code when we + // want to step down + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // listen for interrupts or the Linux SIGTERM signal and cancel + // our context, which the leader election code will observe and + // step down + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + <-ch + log.Printf("Received termination, signaling shutdown") + cancel() + }() + + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // we're notified when we start - this is where you would + // usually put your code + log.Printf("%s: leading", id) + }, + OnStoppedLeading: func() { + // we can do cleanup here, or after the RunOrDie method + // returns + log.Printf("%s: lost", id) + }, + }, + }) + + // we no longer hold the lease, so perform any cleanup and then + // exit + log.Printf("%s: done", id) +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 2096a5996bf..d3eb79b8e5c 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -118,6 +118,13 @@ type LeaderElectionConfig struct { // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor + // ReleaseOnCancel should be set true if the lock should be released + // when the run context is cancelled. If you set this to true, you must + // ensure all code guarded by this lease has successfully completed + // prior to cancelling the context, or you may have two processes + // simultaneously acting on the critical path. + ReleaseOnCancel bool + // Name is the name of the resource lock for debugging Name string } @@ -249,6 +256,28 @@ func (le *LeaderElector) renew(ctx context.Context) { klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) + + // if we hold the lease, give it up + if le.config.ReleaseOnCancel { + le.release() + } +} + +// release attempts to release the leader lease if we have acquired it. +func (le *LeaderElector) release() bool { + if !le.IsLeader() { + return true + } + leaderElectionRecord := rl.LeaderElectionRecord{ + LeaderTransitions: le.observedRecord.LeaderTransitions, + } + if err := le.config.Lock.Update(leaderElectionRecord); err != nil { + klog.Errorf("Failed to release lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, @@ -284,7 +313,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { le.observedRecord = *oldLeaderElectionRecord le.observedTime = le.clock.Now() } - if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && + le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 842aebdab2a..c286b257909 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -55,6 +55,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { past := time.Now().Add(-1000 * time.Hour) tests := []struct { + name string observedRecord rl.LeaderElectionRecord observedTime time.Time reactors []struct { @@ -66,8 +67,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader bool outHolder string }{ - // acquire from no object { + name: "acquire from no object", reactors: []struct { verb string reaction core.ReactionFunc @@ -88,8 +89,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: true, outHolder: "baz", }, - // acquire from unled object { + name: "acquire from unled object", reactors: []struct { verb string reaction core.ReactionFunc @@ -116,8 +117,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // acquire from led, unacked object { + name: "acquire from led, unacked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -149,8 +150,40 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // don't acquire from led, acked object { + name: "acquire from empty led, acked object", + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":""}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "don't acquire from led, acked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -174,8 +207,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: false, outHolder: "bing", }, - // renew already acquired object { + name: "renew already acquired object", reactors: []struct { verb string reaction core.ReactionFunc @@ -208,83 +241,86 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, } - for i, test := range tests { - // OnNewLeader is called async so we have to wait for it. - var wg sync.WaitGroup - wg.Add(1) - var reportedLeader string - var lock rl.Interface + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface - objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} - resourceLockConfig := rl.ResourceLockConfig{ - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, - } - c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} - for _, reactor := range test.reactors { - c.AddReactor(reactor.verb, objectType, reactor.reaction) - } - c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) - return true, nil, fmt.Errorf("unreachable action") - }) - - switch objectType { - case "endpoints": - lock = &rl.EndpointsLock{ - EndpointsMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, } - case "configmaps": - lock = &rl.ConfigMapLock{ - ConfigMapMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) } - } + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) - lec := LeaderElectionConfig{ - Lock: lock, - LeaseDuration: 10 * time.Second, - Callbacks: LeaderCallbacks{ - OnNewLeader: func(l string) { - defer wg.Done() - reportedLeader = l + switch objectType { + case "endpoints": + lock = &rl.EndpointsLock{ + EndpointsMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + case "configmaps": + lock = &rl.ConfigMapLock{ + ConfigMapMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + } + + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, }, - }, - } - le := &LeaderElector{ - config: lec, - observedRecord: test.observedRecord, - observedTime: test.observedTime, - clock: clock.RealClock{}, - } + } + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedTime: test.observedTime, + clock: clock.RealClock{}, + } - if test.expectSuccess != le.tryAcquireOrRenew() { - t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeeded=%v]", i, !test.expectSuccess) - } + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) + } - le.observedRecord.AcquireTime = metav1.Time{} - le.observedRecord.RenewTime = metav1.Time{} - if le.observedRecord.HolderIdentity != test.outHolder { - t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity) - } - if len(test.reactors) != len(c.Actions()) { - t.Errorf("[%v]wrong number of api interactions", i) - } - if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { - t.Errorf("[%v]leader should have transitioned but did not", i) - } - if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { - t.Errorf("[%v]leader should not have transitioned but did", i) - } + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } - le.maybeReportTransition() - wg.Wait() - if reportedLeader != test.outHolder { - t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) - } + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + }) } } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD index 07c797c23f8..367d454d49c 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD @@ -17,8 +17,8 @@ go_library( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go index c12daad022f..785356894f1 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go @@ -93,6 +93,9 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (cml *ConfigMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go index 6f7dcfb0cc9..bfe5e8b1bb3 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go @@ -88,6 +88,9 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (el *EndpointsLock) RecordEvent(s string) { + if el.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go index 676fd1d7dbc..0bf8e9cd6e6 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -20,8 +20,8 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" ) const ( @@ -35,6 +35,11 @@ const ( // with a random string (e.g. UUID) with only slight modification of this code. // TODO(mikedanese): this should potentially be versioned type LeaderElectionRecord struct { + // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and + // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not + // attempt to acquire leases with empty identities and will wait for the full lease + // interval to expire before attempting to reacquire. This value is set to empty when + // a client voluntarily steps down. HolderIdentity string `json:"holderIdentity"` LeaseDurationSeconds int `json:"leaseDurationSeconds"` AcquireTime metav1.Time `json:"acquireTime"` @@ -42,11 +47,19 @@ type LeaderElectionRecord struct { LeaderTransitions int `json:"leaderTransitions"` } +// EventRecorder records a change in the ResourceLock. +type EventRecorder interface { + Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) +} + // ResourceLockConfig common data that exists across different // resource locks type ResourceLockConfig struct { - Identity string - EventRecorder record.EventRecorder + // Identity is the unique string identifying a lease holder across + // all participants in an election. + Identity string + // EventRecorder is optional. + EventRecorder EventRecorder } // Interface offers a common interface for locking on arbitrary