From b65019457ba4a02dcf39b8da65a759123984b0a5 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 25 Aug 2025 16:28:53 +0200 Subject: [PATCH] client-go leader-election: structured, contextual logging Kubernetes-commit: 63f304708a0fab5078739415f589eff9f2e9dfc7 --- tools/leaderelection/leaderelection.go | 48 +++++++++++---------- tools/leaderelection/leaderelection_test.go | 30 +++++++++---- tools/leaderelection/leasecandidate.go | 15 ++++--- tools/leaderelection/leasecandidate_test.go | 8 +++- 4 files changed, 63 insertions(+), 38 deletions(-) diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index 07180630..29d34c4e 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -209,7 +209,7 @@ type LeaderElector struct { // before leader election loop is stopped by ctx or it has // stopped holding the leader lease func (le *LeaderElector) Run(ctx context.Context) { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithContext(ctx) defer le.config.Callbacks.OnStoppedLeading() if !le.acquire(ctx) { @@ -254,7 +254,8 @@ func (le *LeaderElector) acquire(ctx context.Context) bool { defer cancel() succeeded := false desc := le.config.Lock.Describe() - klog.Infof("attempting to acquire leader lease %v...", desc) + logger := klog.FromContext(ctx) + logger.Info("Attempting to acquire leader lease...", "lock", desc) wait.JitterUntil(func() { if !le.config.Coordinated { succeeded = le.tryAcquireOrRenew(ctx) @@ -263,12 +264,12 @@ func (le *LeaderElector) acquire(ctx context.Context) bool { } le.maybeReportTransition() if !succeeded { - klog.V(4).Infof("failed to acquire lease %v", desc) + logger.V(4).Info("Failed to acquire lease", "lock", desc) return } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) - klog.Infof("successfully acquired lease %v", desc) + logger.Info("Successfully acquired lease", "lock", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded @@ -279,6 +280,7 @@ func (le *LeaderElector) renew(ctx context.Context) { defer le.config.Lock.RecordEvent("stopped leading") ctx, cancel := context.WithCancel(ctx) defer cancel() + logger := klog.FromContext(ctx) wait.Until(func() { err := wait.PollUntilContextTimeout(ctx, le.config.RetryPeriod, le.config.RenewDeadline, true, func(ctx context.Context) (done bool, err error) { if !le.config.Coordinated { @@ -290,22 +292,22 @@ func (le *LeaderElector) renew(ctx context.Context) { le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { - klog.V(5).Infof("successfully renewed lease %v", desc) + logger.V(5).Info("Successfully renewed lease", "lock", desc) return } le.metrics.leaderOff(le.config.Name) - klog.Infof("failed to renew lease %v: %v", desc, err) + logger.Info("Failed to renew lease", "lock", desc, "err", err) cancel() }, le.config.RetryPeriod, ctx.Done()) // if we hold the lease, give it up if le.config.ReleaseOnCancel { - le.release() + le.release(logger) } } // release attempts to release the leader lease if we have acquired it. -func (le *LeaderElector) release() bool { +func (le *LeaderElector) release(logger klog.Logger) bool { ctx := context.Background() timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() @@ -313,10 +315,10 @@ func (le *LeaderElector) release() bool { oldLeaderElectionRecord, _, err := le.config.Lock.Get(timeoutCtx) if err != nil { if !errors.IsNotFound(err) { - klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + logger.Error(err, "error retrieving resource lock", "lock", le.config.Lock.Describe()) return false } - klog.Infof("lease lock not found: %v", le.config.Lock.Describe()) + logger.Info("lease lock not found", "lock", le.config.Lock.Describe()) return false } @@ -331,7 +333,7 @@ func (le *LeaderElector) release() bool { AcquireTime: now, } if err := le.config.Lock.Update(timeoutCtx, leaderElectionRecord); err != nil { - klog.Errorf("Failed to release lock: %v", err) + logger.Error(err, "Failed to release lease", "lock", le.config.Lock.Describe()) return false } @@ -343,6 +345,7 @@ func (le *LeaderElector) release() bool { // lease if it has already been acquired. Returns true on success else returns // false. func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { + logger := klog.FromContext(ctx) now := metav1.NewTime(le.clock.Now()) leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), @@ -355,10 +358,10 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { - klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + logger.Error(err, "Error retrieving lease lock", "lock", le.config.Lock.Describe()) return false } - klog.Infof("lease lock not found: %v", le.config.Lock.Describe()) + logger.Info("Lease lock not found", "lock", le.config.Lock.Describe(), "err", err) return false } @@ -371,18 +374,18 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time) if hasExpired { - klog.Infof("lock has expired: %v", le.config.Lock.Describe()) + logger.Info("Lease has expired", "lock", le.config.Lock.Describe()) return false } if !le.IsLeader() { - klog.V(6).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe()) + logger.V(6).Info("Lease is held and has not yet expired", "lock", le.config.Lock.Describe(), "holder", oldLeaderElectionRecord.HolderIdentity) return false } // 2b. If the lease has been marked as "end of term", don't renew it if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" { - klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe()) + logger.V(4).Info("Lease is marked as 'end of term'", "lock", le.config.Lock.Describe()) // TODO: Instead of letting lease expire, the holder may deleted it directly // This will not be compatible with all controllers, so it needs to be opt-in behavior. // We must ensure all code guarded by this lease has successfully completed @@ -406,7 +409,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { // update the lock itself if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { - klog.Errorf("Failed to update lock: %v", err) + logger.Error(err, "Failed to update lock", "lock", le.config.Lock.Describe()) return false } @@ -418,6 +421,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { // else it tries to renew the lease if it has already been acquired. Returns true // on success else returns false. func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { + logger := klog.FromContext(ctx) now := metav1.NewTime(le.clock.Now()) leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), @@ -438,18 +442,18 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { le.setObservedRecord(&leaderElectionRecord) return true } - klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err) + logger.Error(err, "Failed to update lease optimistically, falling back to slow path", "lock", le.config.Lock.Describe()) } // 2. obtain or create the ElectionRecord oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { - klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + logger.Error(err, "Error retrieving lease lock", "lock", le.config.Lock.Describe()) return false } if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { - klog.Errorf("error initially creating leader election record: %v", err) + logger.Error(err, "Error initially creating lease lock", "lock", le.config.Lock.Describe()) return false } @@ -465,7 +469,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { le.observedRawRecord = oldLeaderElectionRawRecord } if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() { - klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) + logger.V(4).Info("Lease is held by and has not yet expired", "lock", le.config.Lock.Describe(), "holder", oldLeaderElectionRecord.HolderIdentity) return false } @@ -481,7 +485,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { // update the lock itself if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { - klog.Errorf("Failed to update lock: %v", err) + logger.Error(err, "Failed to update lease", "lock", le.config.Lock.Describe()) return false } diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index eff46177..6762a32d 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -37,6 +37,7 @@ import ( fakeclient "k8s.io/client-go/testing" rl "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/clock" ) @@ -265,6 +266,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { for i := range tests { test := &tests[i] t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + // OnNewLeader is called async so we have to wait for it. var wg sync.WaitGroup wg.Add(1) @@ -316,10 +319,10 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { clock: clock, metrics: globalMetricsFactory.newLeaderMetrics(), } - if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) { + if test.expectSuccess != le.tryAcquireOrRenew(ctx) { if test.retryAfter != 0 { time.Sleep(test.retryAfter) - if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) { + if test.expectSuccess != le.tryAcquireOrRenew(ctx) { t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) } } else { @@ -411,6 +414,8 @@ func TestTryCoordinatedRenew(t *testing.T) { for i := range tests { test := &tests[i] t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + // OnNewLeader is called async so we have to wait for it. var wg sync.WaitGroup wg.Add(1) @@ -457,10 +462,10 @@ func TestTryCoordinatedRenew(t *testing.T) { clock: clock, metrics: globalMetricsFactory.newLeaderMetrics(), } - if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) { + if test.expectSuccess != le.tryCoordinatedRenew(ctx) { if test.retryAfter != 0 { time.Sleep(test.retryAfter) - if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) { + if test.expectSuccess != le.tryCoordinatedRenew(ctx) { t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess) } } else { @@ -590,6 +595,8 @@ func testReleaseLease(t *testing.T, objectType string) { for i := range tests { test := &tests[i] t.Run(test.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + // OnNewLeader is called async so we have to wait for it. var wg sync.WaitGroup wg.Add(1) @@ -641,7 +648,7 @@ func testReleaseLease(t *testing.T, objectType string) { clock: clock.RealClock{}, metrics: globalMetricsFactory.newLeaderMetrics(), } - if !le.tryAcquireOrRenew(context.Background()) { + if !le.tryAcquireOrRenew(ctx) { t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true) } @@ -651,7 +658,7 @@ func testReleaseLease(t *testing.T, objectType string) { wg.Wait() wg.Add(1) - if test.expectSuccess != le.release() { + if test.expectSuccess != le.release(logger) { t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess) } @@ -686,6 +693,7 @@ func TestReleaseLeaseLeases(t *testing.T) { // TestReleaseMethodCallsGet test release method calls Get func TestReleaseMethodCallsGet(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) objectType := "leases" getCalled := false @@ -730,7 +738,7 @@ func TestReleaseMethodCallsGet(t *testing.T) { metrics: globalMetricsFactory.newLeaderMetrics(), } - le.release() + le.release(logger) if !getCalled { t.Errorf("release method does not call Get") @@ -903,6 +911,8 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { for i := range tests { test := &tests[i] t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + wg.Add(1) resetVars() @@ -930,7 +940,7 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { t.Fatal("Failed to create leader elector: ", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) go elector.Run(ctx) @@ -1144,6 +1154,8 @@ func TestFastPathLeaderElection(t *testing.T) { for i := range tests { test := &tests[i] t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + resetVars() recorder := record.NewFakeRecorder(100) @@ -1170,7 +1182,7 @@ func TestFastPathLeaderElection(t *testing.T) { t.Fatal("Failed to create leader elector: ", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) cancelFunc = cancel elector.Run(ctx) diff --git a/tools/leaderelection/leasecandidate.go b/tools/leaderelection/leasecandidate.go index 9aaf779e..b2fa14a5 100644 --- a/tools/leaderelection/leasecandidate.go +++ b/tools/leaderelection/leasecandidate.go @@ -120,8 +120,12 @@ func NewCandidate(clientset kubernetes.Interface, func (c *LeaseCandidate) Run(ctx context.Context) { defer c.queue.ShutDown() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "leasecandidate") + ctx = klog.NewContext(ctx, logger) + c.informerFactory.Start(ctx.Done()) - if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) { + if !cache.WaitForNamedCacheSyncWithContext(ctx, c.hasSynced) { return } @@ -148,7 +152,7 @@ func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool { return true } - utilruntime.HandleError(err) + utilruntime.HandleErrorWithContext(ctx, err, "Ensuring lease failed") c.queue.AddRateLimited(key) return true @@ -161,20 +165,21 @@ func (c *LeaseCandidate) enqueueLease() { // ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and // a bool (true if this call created the lease), or any error that occurs. func (c *LeaseCandidate) ensureLease(ctx context.Context) error { + logger := klog.FromContext(ctx) lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - klog.V(2).Infof("Creating lease candidate") + logger.V(2).Info("Creating lease candidate") // lease does not exist, create it. leaseToCreate := c.newLeaseCandidate() if _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}); err != nil { return err } - klog.V(2).Infof("Created lease candidate") + logger.V(2).Info("Created lease candidate") return nil } else if err != nil { return err } - klog.V(2).Infof("lease candidate exists. Renewing.") + logger.V(2).Info("Lease candidate exists. Renewing.") clone := lease.DeepCopy() clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} _, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{}) diff --git a/tools/leaderelection/leasecandidate_test.go b/tools/leaderelection/leasecandidate_test.go index 3099a4ea..efd51f46 100644 --- a/tools/leaderelection/leasecandidate_test.go +++ b/tools/leaderelection/leasecandidate_test.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" ) type testcase struct { @@ -34,6 +35,7 @@ type testcase struct { } func TestLeaseCandidateCreation(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) tc := testcase{ candidateName: "foo", candidateNamespace: "default", @@ -42,7 +44,7 @@ func TestLeaseCandidateCreation(t *testing.T) { emulationVersion: "1.30.0", } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() client := fake.NewSimpleClientset() @@ -67,6 +69,8 @@ func TestLeaseCandidateCreation(t *testing.T) { } func TestLeaseCandidateAck(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + tc := testcase{ candidateName: "foo", candidateNamespace: "default", @@ -75,7 +79,7 @@ func TestLeaseCandidateAck(t *testing.T) { emulationVersion: "1.30.0", } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() client := fake.NewSimpleClientset()