From ac40ba32975a441002e2ef5807fef32aab294292 Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Wed, 12 Oct 2022 20:53:05 +1100 Subject: [PATCH] Always emit the stopped leading event Kubernetes-commit: 771ab7488d53f0d6c85238992a8e5f4bcb73e24e --- tools/leaderelection/leaderelection.go | 9 ++-- tools/leaderelection/leaderelection_test.go | 57 ++++++++++++++++++--- 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index c64ba9b2..eb72b251 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -64,9 +64,8 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" rl "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/utils/clock" - "k8s.io/klog/v2" + "k8s.io/utils/clock" ) const ( @@ -199,9 +198,7 @@ type LeaderElector struct { // stopped holding the leader lease func (le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() - defer func() { - le.config.Callbacks.OnStoppedLeading() - }() + defer le.config.Callbacks.OnStoppedLeading() if !le.acquire(ctx) { return // ctx signalled done @@ -263,6 +260,7 @@ func (le *LeaderElector) acquire(ctx context.Context) bool { // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. func (le *LeaderElector) renew(ctx context.Context) { + defer le.config.Lock.RecordEvent("stopped leading") ctx, cancel := context.WithCancel(ctx) defer cancel() wait.Until(func() { @@ -278,7 +276,6 @@ func (le *LeaderElector) renew(ctx context.Context) { klog.V(5).Infof("successfully renewed lease %v", desc) return } - le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index 6b5b0790..ea047658 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/util/wait" "sync" "testing" "time" @@ -81,6 +82,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { observedRecord rl.LeaderElectionRecord observedTime time.Time reactors []Reactor + expectedEvents []string expectSuccess bool transitionLeader bool @@ -240,9 +242,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { var lock rl.Interface objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + recorder := record.NewFakeRecorder(100) resourceLockConfig := rl.ResourceLockConfig{ Identity: "baz", - EventRecorder: &record.FakeRecorder{}, + EventRecorder: recorder, } c := &fake.Clientset{} for _, reactor := range test.reactors { @@ -306,6 +309,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { if reportedLeader != test.outHolder { t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) } + assertEqualEvents(t, test.expectedEvents, recorder.Events) }) } } @@ -383,6 +387,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) { observedRawRecord []byte observedTime time.Time reactors []Reactor + expectedEvents []string expectSuccess bool transitionLeader bool @@ -799,9 +804,10 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) { wg.Add(1) var reportedLeader string + recorder := record.NewFakeRecorder(100) resourceLockConfig := rl.ResourceLockConfig{ Identity: "baz", - EventRecorder: &record.FakeRecorder{}, + EventRecorder: recorder, } c := &fake.Clientset{} for _, reactor := range test.reactors { @@ -858,6 +864,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) { if reportedLeader != test.outHolder { t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) } + assertEqualEvents(t, test.expectedEvents, recorder.Events) }) } } @@ -878,6 +885,7 @@ func testReleaseLease(t *testing.T, objectType string) { observedRecord rl.LeaderElectionRecord observedTime time.Time reactors []Reactor + expectedEvents []string expectSuccess bool transitionLeader bool @@ -923,9 +931,10 @@ func testReleaseLease(t *testing.T, objectType string) { var lock rl.Interface objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + recorder := record.NewFakeRecorder(100) resourceLockConfig := rl.ResourceLockConfig{ Identity: "baz", - EventRecorder: &record.FakeRecorder{}, + EventRecorder: recorder, } c := &fake.Clientset{} for _, reactor := range test.reactors { @@ -998,6 +1007,7 @@ func testReleaseLease(t *testing.T, objectType string) { if reportedLeader != test.outHolder { t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) } + assertEqualEvents(t, test.expectedEvents, recorder.Events) }) } } @@ -1051,8 +1061,9 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { } tests := []struct { - name string - reactors []Reactor + name string + reactors []Reactor + expectedEvents []string }{ { name: "release acquired lock on cancellation of update", @@ -1100,6 +1111,10 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { }, }, }, + expectedEvents: []string{ + "Normal LeaderElection baz became leader", + "Normal LeaderElection baz stopped leading", + }, }, { name: "release acquired lock on cancellation of get", @@ -1148,6 +1163,10 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { }, }, }, + expectedEvents: []string{ + "Normal LeaderElection baz became leader", + "Normal LeaderElection baz stopped leading", + }, }, } @@ -1157,9 +1176,10 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { wg.Add(1) resetVars() + recorder := record.NewFakeRecorder(100) resourceLockConfig := rl.ResourceLockConfig{ Identity: "baz", - EventRecorder: &record.FakeRecorder{}, + EventRecorder: recorder, } c := &fake.Clientset{} for _, reactor := range test.reactors { @@ -1212,6 +1232,31 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { t.Fatal("the lock was not released") } wg.Wait() + assertEqualEvents(t, test.expectedEvents, recorder.Events) }) } } + +func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) { + c := time.After(wait.ForeverTestTimeout) + for _, e := range expected { + select { + case a := <-actual: + if e != a { + t.Errorf("Expected event %q, got %q", e, a) + return + } + case <-c: + t.Errorf("Expected event %q, got nothing", e) + // continue iterating to print all expected events + } + } + for { + select { + case a := <-actual: + t.Errorf("Unexpected event: %q", a) + default: + return // No more events, as expected. + } + } +}