Always emit the stopped leading event

Kubernetes-commit: 771ab7488d53f0d6c85238992a8e5f4bcb73e24e
This commit is contained in:
Mikhail Mazurskiy 2022-10-12 20:53:05 +11:00 committed by Kubernetes Publisher
parent 84ad8a7920
commit ac40ba3297
2 changed files with 54 additions and 12 deletions

View File

@ -64,9 +64,8 @@ import (
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
rl "k8s.io/client-go/tools/leaderelection/resourcelock" rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/utils/clock"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/clock"
) )
const ( const (
@ -199,9 +198,7 @@ type LeaderElector struct {
// stopped holding the leader lease // stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) { func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer func() { defer le.config.Callbacks.OnStoppedLeading()
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) { if !le.acquire(ctx) {
return // ctx signalled done return // ctx signalled done
@ -263,6 +260,7 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) { func (le *LeaderElector) renew(ctx context.Context) {
defer le.config.Lock.RecordEvent("stopped leading")
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
wait.Until(func() { wait.Until(func() {
@ -278,7 +276,6 @@ func (le *LeaderElector) renew(ctx context.Context) {
klog.V(5).Infof("successfully renewed lease %v", desc) klog.V(5).Infof("successfully renewed lease %v", desc)
return return
} }
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name) le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err) klog.Infof("failed to renew lease %v: %v", desc, err)
cancel() cancel()

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"k8s.io/apimachinery/pkg/util/wait"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -81,6 +82,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedRecord rl.LeaderElectionRecord observedRecord rl.LeaderElectionRecord
observedTime time.Time observedTime time.Time
reactors []Reactor reactors []Reactor
expectedEvents []string
expectSuccess bool expectSuccess bool
transitionLeader bool transitionLeader bool
@ -240,9 +242,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
var lock rl.Interface var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{ resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz", Identity: "baz",
EventRecorder: &record.FakeRecorder{}, EventRecorder: recorder,
} }
c := &fake.Clientset{} c := &fake.Clientset{}
for _, reactor := range test.reactors { for _, reactor := range test.reactors {
@ -306,6 +309,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
if reportedLeader != test.outHolder { if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
} }
assertEqualEvents(t, test.expectedEvents, recorder.Events)
}) })
} }
} }
@ -383,6 +387,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
observedRawRecord []byte observedRawRecord []byte
observedTime time.Time observedTime time.Time
reactors []Reactor reactors []Reactor
expectedEvents []string
expectSuccess bool expectSuccess bool
transitionLeader bool transitionLeader bool
@ -799,9 +804,10 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
wg.Add(1) wg.Add(1)
var reportedLeader string var reportedLeader string
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{ resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz", Identity: "baz",
EventRecorder: &record.FakeRecorder{}, EventRecorder: recorder,
} }
c := &fake.Clientset{} c := &fake.Clientset{}
for _, reactor := range test.reactors { for _, reactor := range test.reactors {
@ -858,6 +864,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
if reportedLeader != test.outHolder { if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
} }
assertEqualEvents(t, test.expectedEvents, recorder.Events)
}) })
} }
} }
@ -878,6 +885,7 @@ func testReleaseLease(t *testing.T, objectType string) {
observedRecord rl.LeaderElectionRecord observedRecord rl.LeaderElectionRecord
observedTime time.Time observedTime time.Time
reactors []Reactor reactors []Reactor
expectedEvents []string
expectSuccess bool expectSuccess bool
transitionLeader bool transitionLeader bool
@ -923,9 +931,10 @@ func testReleaseLease(t *testing.T, objectType string) {
var lock rl.Interface var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{ resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz", Identity: "baz",
EventRecorder: &record.FakeRecorder{}, EventRecorder: recorder,
} }
c := &fake.Clientset{} c := &fake.Clientset{}
for _, reactor := range test.reactors { for _, reactor := range test.reactors {
@ -998,6 +1007,7 @@ func testReleaseLease(t *testing.T, objectType string) {
if reportedLeader != test.outHolder { if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
} }
assertEqualEvents(t, test.expectedEvents, recorder.Events)
}) })
} }
} }
@ -1051,8 +1061,9 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
} }
tests := []struct { tests := []struct {
name string name string
reactors []Reactor reactors []Reactor
expectedEvents []string
}{ }{
{ {
name: "release acquired lock on cancellation of update", name: "release acquired lock on cancellation of update",
@ -1100,6 +1111,10 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
}, },
}, },
}, },
expectedEvents: []string{
"Normal LeaderElection baz became leader",
"Normal LeaderElection baz stopped leading",
},
}, },
{ {
name: "release acquired lock on cancellation of get", name: "release acquired lock on cancellation of get",
@ -1148,6 +1163,10 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
}, },
}, },
}, },
expectedEvents: []string{
"Normal LeaderElection baz became leader",
"Normal LeaderElection baz stopped leading",
},
}, },
} }
@ -1157,9 +1176,10 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
wg.Add(1) wg.Add(1)
resetVars() resetVars()
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{ resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz", Identity: "baz",
EventRecorder: &record.FakeRecorder{}, EventRecorder: recorder,
} }
c := &fake.Clientset{} c := &fake.Clientset{}
for _, reactor := range test.reactors { for _, reactor := range test.reactors {
@ -1212,6 +1232,31 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
t.Fatal("the lock was not released") t.Fatal("the lock was not released")
} }
wg.Wait() wg.Wait()
assertEqualEvents(t, test.expectedEvents, recorder.Events)
}) })
} }
} }
func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) {
c := time.After(wait.ForeverTestTimeout)
for _, e := range expected {
select {
case a := <-actual:
if e != a {
t.Errorf("Expected event %q, got %q", e, a)
return
}
case <-c:
t.Errorf("Expected event %q, got nothing", e)
// continue iterating to print all expected events
}
}
for {
select {
case a := <-actual:
t.Errorf("Unexpected event: %q", a)
default:
return // No more events, as expected.
}
}
}