client-go leader-election: structured, contextual logging

Kubernetes-commit: 63f304708a0fab5078739415f589eff9f2e9dfc7
This commit is contained in:
Patrick Ohly
2025-08-25 16:28:53 +02:00
committed by Kubernetes Publisher
parent d327527793
commit b65019457b
4 changed files with 63 additions and 38 deletions

View File

@@ -209,7 +209,7 @@ type LeaderElector struct {
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer runtime.HandleCrashWithContext(ctx)
defer le.config.Callbacks.OnStoppedLeading()
if !le.acquire(ctx) {
@@ -254,7 +254,8 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
logger := klog.FromContext(ctx)
logger.Info("Attempting to acquire leader lease...", "lock", desc)
wait.JitterUntil(func() {
if !le.config.Coordinated {
succeeded = le.tryAcquireOrRenew(ctx)
@@ -263,12 +264,12 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
}
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
logger.V(4).Info("Failed to acquire lease", "lock", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
logger.Info("Successfully acquired lease", "lock", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
@@ -279,6 +280,7 @@ func (le *LeaderElector) renew(ctx context.Context) {
defer le.config.Lock.RecordEvent("stopped leading")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger := klog.FromContext(ctx)
wait.Until(func() {
err := wait.PollUntilContextTimeout(ctx, le.config.RetryPeriod, le.config.RenewDeadline, true, func(ctx context.Context) (done bool, err error) {
if !le.config.Coordinated {
@@ -290,22 +292,22 @@ func (le *LeaderElector) renew(ctx context.Context) {
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
logger.V(5).Info("Successfully renewed lease", "lock", desc)
return
}
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
logger.Info("Failed to renew lease", "lock", desc, "err", err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
le.release(logger)
}
}
// release attempts to release the leader lease if we have acquired it.
func (le *LeaderElector) release() bool {
func (le *LeaderElector) release(logger klog.Logger) bool {
ctx := context.Background()
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
@@ -313,10 +315,10 @@ func (le *LeaderElector) release() bool {
oldLeaderElectionRecord, _, err := le.config.Lock.Get(timeoutCtx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
logger.Error(err, "error retrieving resource lock", "lock", le.config.Lock.Describe())
return false
}
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
logger.Info("lease lock not found", "lock", le.config.Lock.Describe())
return false
}
@@ -331,7 +333,7 @@ func (le *LeaderElector) release() bool {
AcquireTime: now,
}
if err := le.config.Lock.Update(timeoutCtx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
logger.Error(err, "Failed to release lease", "lock", le.config.Lock.Describe())
return false
}
@@ -343,6 +345,7 @@ func (le *LeaderElector) release() bool {
// lease if it has already been acquired. Returns true on success else returns
// false.
func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
logger := klog.FromContext(ctx)
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
@@ -355,10 +358,10 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
logger.Error(err, "Error retrieving lease lock", "lock", le.config.Lock.Describe())
return false
}
klog.Infof("lease lock not found: %v", le.config.Lock.Describe())
logger.Info("Lease lock not found", "lock", le.config.Lock.Describe(), "err", err)
return false
}
@@ -371,18 +374,18 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time)
if hasExpired {
klog.Infof("lock has expired: %v", le.config.Lock.Describe())
logger.Info("Lease has expired", "lock", le.config.Lock.Describe())
return false
}
if !le.IsLeader() {
klog.V(6).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe())
logger.V(6).Info("Lease is held and has not yet expired", "lock", le.config.Lock.Describe(), "holder", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 2b. If the lease has been marked as "end of term", don't renew it
if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" {
klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe())
logger.V(4).Info("Lease is marked as 'end of term'", "lock", le.config.Lock.Describe())
// TODO: Instead of letting lease expire, the holder may deleted it directly
// This will not be compatible with all controllers, so it needs to be opt-in behavior.
// We must ensure all code guarded by this lease has successfully completed
@@ -406,7 +409,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
logger.Error(err, "Failed to update lock", "lock", le.config.Lock.Describe())
return false
}
@@ -418,6 +421,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool {
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
logger := klog.FromContext(ctx)
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
@@ -438,18 +442,18 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err)
logger.Error(err, "Failed to update lease optimistically, falling back to slow path", "lock", le.config.Lock.Describe())
}
// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
logger.Error(err, "Error retrieving lease lock", "lock", le.config.Lock.Describe())
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
logger.Error(err, "Error initially creating lease lock", "lock", le.config.Lock.Describe())
return false
}
@@ -465,7 +469,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
logger.V(4).Info("Lease is held by and has not yet expired", "lock", le.config.Lock.Describe(), "holder", oldLeaderElectionRecord.HolderIdentity)
return false
}
@@ -481,7 +485,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
logger.Error(err, "Failed to update lease", "lock", le.config.Lock.Describe())
return false
}

View File

@@ -37,6 +37,7 @@ import (
fakeclient "k8s.io/client-go/testing"
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock"
)
@@ -265,6 +266,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
@@ -316,10 +319,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
clock: clock,
metrics: globalMetricsFactory.newLeaderMetrics(),
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
if test.expectSuccess != le.tryAcquireOrRenew(ctx) {
if test.retryAfter != 0 {
time.Sleep(test.retryAfter)
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
if test.expectSuccess != le.tryAcquireOrRenew(ctx) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
} else {
@@ -411,6 +414,8 @@ func TestTryCoordinatedRenew(t *testing.T) {
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
@@ -457,10 +462,10 @@ func TestTryCoordinatedRenew(t *testing.T) {
clock: clock,
metrics: globalMetricsFactory.newLeaderMetrics(),
}
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
if test.expectSuccess != le.tryCoordinatedRenew(ctx) {
if test.retryAfter != 0 {
time.Sleep(test.retryAfter)
if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) {
if test.expectSuccess != le.tryCoordinatedRenew(ctx) {
t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess)
}
} else {
@@ -590,6 +595,8 @@ func testReleaseLease(t *testing.T, objectType string) {
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
@@ -641,7 +648,7 @@ func testReleaseLease(t *testing.T, objectType string) {
clock: clock.RealClock{},
metrics: globalMetricsFactory.newLeaderMetrics(),
}
if !le.tryAcquireOrRenew(context.Background()) {
if !le.tryAcquireOrRenew(ctx) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
}
@@ -651,7 +658,7 @@ func testReleaseLease(t *testing.T, objectType string) {
wg.Wait()
wg.Add(1)
if test.expectSuccess != le.release() {
if test.expectSuccess != le.release(logger) {
t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
}
@@ -686,6 +693,7 @@ func TestReleaseLeaseLeases(t *testing.T) {
// TestReleaseMethodCallsGet test release method calls Get
func TestReleaseMethodCallsGet(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
objectType := "leases"
getCalled := false
@@ -730,7 +738,7 @@ func TestReleaseMethodCallsGet(t *testing.T) {
metrics: globalMetricsFactory.newLeaderMetrics(),
}
le.release()
le.release(logger)
if !getCalled {
t.Errorf("release method does not call Get")
@@ -903,6 +911,8 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
wg.Add(1)
resetVars()
@@ -930,7 +940,7 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
go elector.Run(ctx)
@@ -1144,6 +1154,8 @@ func TestFastPathLeaderElection(t *testing.T) {
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
resetVars()
recorder := record.NewFakeRecorder(100)
@@ -1170,7 +1182,7 @@ func TestFastPathLeaderElection(t *testing.T) {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
cancelFunc = cancel
elector.Run(ctx)

View File

@@ -120,8 +120,12 @@ func NewCandidate(clientset kubernetes.Interface,
func (c *LeaseCandidate) Run(ctx context.Context) {
defer c.queue.ShutDown()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "leasecandidate")
ctx = klog.NewContext(ctx, logger)
c.informerFactory.Start(ctx.Done())
if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) {
if !cache.WaitForNamedCacheSyncWithContext(ctx, c.hasSynced) {
return
}
@@ -148,7 +152,7 @@ func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool {
return true
}
utilruntime.HandleError(err)
utilruntime.HandleErrorWithContext(ctx, err, "Ensuring lease failed")
c.queue.AddRateLimited(key)
return true
@@ -161,20 +165,21 @@ func (c *LeaseCandidate) enqueueLease() {
// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs.
func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
logger := klog.FromContext(ctx)
lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
klog.V(2).Infof("Creating lease candidate")
logger.V(2).Info("Creating lease candidate")
// lease does not exist, create it.
leaseToCreate := c.newLeaseCandidate()
if _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}); err != nil {
return err
}
klog.V(2).Infof("Created lease candidate")
logger.V(2).Info("Created lease candidate")
return nil
} else if err != nil {
return err
}
klog.V(2).Infof("lease candidate exists. Renewing.")
logger.V(2).Info("Lease candidate exists. Renewing.")
clone := lease.DeepCopy()
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})

View File

@@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
)
type testcase struct {
@@ -34,6 +35,7 @@ type testcase struct {
}
func TestLeaseCandidateCreation(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tc := testcase{
candidateName: "foo",
candidateNamespace: "default",
@@ -42,7 +44,7 @@ func TestLeaseCandidateCreation(t *testing.T) {
emulationVersion: "1.30.0",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
client := fake.NewSimpleClientset()
@@ -67,6 +69,8 @@ func TestLeaseCandidateCreation(t *testing.T) {
}
func TestLeaseCandidateAck(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
tc := testcase{
candidateName: "foo",
candidateNamespace: "default",
@@ -75,7 +79,7 @@ func TestLeaseCandidateAck(t *testing.T) {
emulationVersion: "1.30.0",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
client := fake.NewSimpleClientset()