From 1d9f7fd516b4787f5ef32692711d5ae3031e794e Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Sat, 25 Nov 2023 22:04:32 +0000 Subject: [PATCH 1/2] leaderelection: optimistically update leader lock Signed-off-by: Eric Lin --- .../tools/leaderelection/leaderelection.go | 30 +++- .../leaderelection/leaderelection_test.go | 167 +++++++++++++++++- 2 files changed, 188 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index c1151baf207..af840c4a251 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -325,7 +325,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { AcquireTime: now, } - // 1. obtain or create the ElectionRecord + // 1. fast path for the leader to update optimistically assuming that the record observed + // last time is the current version. + if le.IsLeader() && le.isLeaseValid(now.Time) { + oldObservedRecord := le.getObservedRecord() + leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions + + err := le.config.Lock.Update(ctx, leaderElectionRecord) + if err == nil { + le.setObservedRecord(&leaderElectionRecord) + return true + } + klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err) + } + + // 2. obtain or create the ElectionRecord oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { @@ -342,24 +357,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { return true } - // 2. Record obtained, check the Identity & Time + // 3. Record obtained, check the Identity & Time if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.setObservedRecord(oldLeaderElectionRecord) le.observedRawRecord = oldLeaderElectionRawRecord } - if len(oldLeaderElectionRecord.HolderIdentity) > 0 && - le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) && - !le.IsLeader() { + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } - // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // 4. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + le.metrics.slowpathExercised(le.config.Name) } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } @@ -400,6 +414,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { return nil } +func (le *LeaderElector) isLeaseValid(now time.Time) bool { + return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now) +} + // setObservedRecord will set a new observedRecord and update observedTime to the current time. // Protect critical sections with lock. func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) { diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 02fa6a1d4dc..8c94b35a217 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -610,14 +610,18 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { objectType: objectType, reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { updates++ + // Skip initial two fast path renews + if updates%2 == 1 && updates < 5 { + return true, nil, context.Canceled + } // Second update (first renew) should return our canceled error // FakeClient doesn't do anything with the context so we're doing this ourselves - if updates == 2 { + if updates == 4 { close(onRenewCalled) <-onRenewResume return true, nil, context.Canceled - } else if updates == 3 { + } else if updates == 5 { // We update the lock after the cancellation to release it // This wg is to avoid the data race on lockObj defer wg.Done() @@ -668,8 +672,12 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { objectType: objectType, reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { updates++ + // Always skip fast path renew + if updates%2 == 1 { + return true, nil, context.Canceled + } // Second update (first renew) should release the lock - if updates == 2 { + if updates == 4 { // We update the lock after the cancellation to release it // This wg is to avoid the data race on lockObj defer wg.Done() @@ -813,3 +821,156 @@ func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) { } } } + +func TestFastPathLeaderElection(t *testing.T) { + objectType := "leases" + var ( + lockObj runtime.Object + updates int + lockOps []string + cancelFunc func() + ) + resetVars := func() { + lockObj = nil + updates = 0 + lockOps = []string{} + cancelFunc = nil + } + lec := LeaderElectionConfig{ + LeaseDuration: 15 * time.Second, + RenewDeadline: 2 * time.Second, + RetryPeriod: 1 * time.Second, + + Callbacks: LeaderCallbacks{ + OnNewLeader: func(identity string) {}, + OnStoppedLeading: func() {}, + OnStartedLeading: func(context.Context) { + }, + }, + } + + tests := []struct { + name string + reactors []Reactor + expectedLockOps []string + }{ + { + name: "Exercise fast path after lock acquired", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "get") + if lockObj != nil { + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "create") + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + lockOps = append(lockOps, "update") + if updates == 2 { + cancelFunc() + } + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + expectedLockOps: []string{"get", "create", "update", "update"}, + }, + { + name: "Fallback to slow path after fast path fails", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "get") + if lockObj != nil { + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "create") + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + lockOps = append(lockOps, "update") + switch updates { + case 2: + return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil) + case 4: + cancelFunc() + } + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"}, + }, + } + + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + resetVars() + + recorder := record.NewFakeRecorder(100) + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: recorder, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig) + if err != nil { + t.Fatal("resourcelock.New() = ", err) + } + + lec.Lock = lock + elector, err := NewLeaderElector(lec) + if err != nil { + t.Fatal("Failed to create leader elector: ", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancelFunc = cancel + + elector.Run(ctx) + assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps) + }) + } +} From 1e54c050936be1a1e3e5758718ebca86096dbaea Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Mon, 27 Nov 2023 13:10:24 +0000 Subject: [PATCH 2/2] leaderelection: Instrument for when slowpath is exercised Signed-off-by: Eric Lin --- .../leaderelection/leaderelection_test.go | 2 ++ .../client-go/tools/leaderelection/metrics.go | 30 ++++++++++++------- .../clientgo/leaderelection/metrics.go | 26 ++++++++++++---- 3 files changed, 42 insertions(+), 16 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 8c94b35a217..de481e0adfc 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -315,6 +315,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { observedRawRecord: observedRawRecord, observedTime: test.observedTime, clock: clock, + metrics: globalMetricsFactory.newLeaderMetrics(), } if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) { if test.retryAfter != 0 { @@ -491,6 +492,7 @@ func testReleaseLease(t *testing.T, objectType string) { observedRawRecord: observedRawRecord, observedTime: test.observedTime, clock: clock.RealClock{}, + metrics: globalMetricsFactory.newLeaderMetrics(), } if !le.tryAcquireOrRenew(context.Background()) { t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/metrics.go b/staging/src/k8s.io/client-go/tools/leaderelection/metrics.go index 65917bf88e1..7438345fb15 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/metrics.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/metrics.go @@ -26,24 +26,26 @@ import ( type leaderMetricsAdapter interface { leaderOn(name string) leaderOff(name string) + slowpathExercised(name string) } -// GaugeMetric represents a single numerical value that can arbitrarily go up -// and down. -type SwitchMetric interface { +// LeaderMetric instruments metrics used in leader election. +type LeaderMetric interface { On(name string) Off(name string) + SlowpathExercised(name string) } type noopMetric struct{} -func (noopMetric) On(name string) {} -func (noopMetric) Off(name string) {} +func (noopMetric) On(name string) {} +func (noopMetric) Off(name string) {} +func (noopMetric) SlowpathExercised(name string) {} // defaultLeaderMetrics expects the caller to lock before setting any metrics. type defaultLeaderMetrics struct { // leader's value indicates if the current process is the owner of name lease - leader SwitchMetric + leader LeaderMetric } func (m *defaultLeaderMetrics) leaderOn(name string) { @@ -60,19 +62,27 @@ func (m *defaultLeaderMetrics) leaderOff(name string) { m.leader.Off(name) } +func (m *defaultLeaderMetrics) slowpathExercised(name string) { + if m == nil { + return + } + m.leader.SlowpathExercised(name) +} + type noMetrics struct{} -func (noMetrics) leaderOn(name string) {} -func (noMetrics) leaderOff(name string) {} +func (noMetrics) leaderOn(name string) {} +func (noMetrics) leaderOff(name string) {} +func (noMetrics) slowpathExercised(name string) {} // MetricsProvider generates various metrics used by the leader election. type MetricsProvider interface { - NewLeaderMetric() SwitchMetric + NewLeaderMetric() LeaderMetric } type noopMetricsProvider struct{} -func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric { +func (noopMetricsProvider) NewLeaderMetric() LeaderMetric { return noopMetric{} } diff --git a/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go b/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go index fd85d136cb0..004767bd766 100644 --- a/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go +++ b/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go @@ -28,27 +28,41 @@ var ( StabilityLevel: k8smetrics.ALPHA, Help: "Gauge of if the reporting system is master of the relevant lease, 0 indicates backup, 1 indicates master. 'name' is the string used to identify the lease. Please make sure to group by name.", }, []string{"name"}) + // A cumulative counter should be sufficient to get a rough ratio of slow path + // exercised given the leader election frequency is specified explicitly. So that + // to avoid the overhead to report a counter exercising fastpath. + leaderSlowpathCounter = k8smetrics.NewCounterVec(&k8smetrics.CounterOpts{ + Name: "leader_election_slowpath_total", + StabilityLevel: k8smetrics.ALPHA, + Help: "Total number of slow path exercised in renewing leader leases. 'name' is the string used to identify the lease. Please make sure to group by name.", + }, []string{"name"}) ) func init() { legacyregistry.MustRegister(leaderGauge) + legacyregistry.MustRegister(leaderSlowpathCounter) leaderelection.SetProvider(prometheusMetricsProvider{}) } type prometheusMetricsProvider struct{} -func (prometheusMetricsProvider) NewLeaderMetric() leaderelection.SwitchMetric { - return &switchAdapter{gauge: leaderGauge} +func (prometheusMetricsProvider) NewLeaderMetric() leaderelection.LeaderMetric { + return &leaderAdapter{gauge: leaderGauge, counter: leaderSlowpathCounter} } -type switchAdapter struct { - gauge *k8smetrics.GaugeVec +type leaderAdapter struct { + gauge *k8smetrics.GaugeVec + counter *k8smetrics.CounterVec } -func (s *switchAdapter) On(name string) { +func (s *leaderAdapter) On(name string) { s.gauge.WithLabelValues(name).Set(1.0) } -func (s *switchAdapter) Off(name string) { +func (s *leaderAdapter) Off(name string) { s.gauge.WithLabelValues(name).Set(0.0) } + +func (s *leaderAdapter) SlowpathExercised(name string) { + s.counter.WithLabelValues(name).Inc() +}