From 1d9f7fd516b4787f5ef32692711d5ae3031e794e Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Sat, 25 Nov 2023 22:04:32 +0000 Subject: [PATCH] leaderelection: optimistically update leader lock Signed-off-by: Eric Lin --- .../tools/leaderelection/leaderelection.go | 30 +++- .../leaderelection/leaderelection_test.go | 167 +++++++++++++++++- 2 files changed, 188 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index c1151baf207..af840c4a251 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -325,7 +325,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { AcquireTime: now, } - // 1. obtain or create the ElectionRecord + // 1. fast path for the leader to update optimistically assuming that the record observed + // last time is the current version. + if le.IsLeader() && le.isLeaseValid(now.Time) { + oldObservedRecord := le.getObservedRecord() + leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions + + err := le.config.Lock.Update(ctx, leaderElectionRecord) + if err == nil { + le.setObservedRecord(&leaderElectionRecord) + return true + } + klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err) + } + + // 2. obtain or create the ElectionRecord oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { @@ -342,24 +357,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { return true } - // 2. Record obtained, check the Identity & Time + // 3. Record obtained, check the Identity & Time if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.setObservedRecord(oldLeaderElectionRecord) le.observedRawRecord = oldLeaderElectionRawRecord } - if len(oldLeaderElectionRecord.HolderIdentity) > 0 && - le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) && - !le.IsLeader() { + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } - // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // 4. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + le.metrics.slowpathExercised(le.config.Name) } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } @@ -400,6 +414,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { return nil } +func (le *LeaderElector) isLeaseValid(now time.Time) bool { + return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now) +} + // setObservedRecord will set a new observedRecord and update observedTime to the current time. // Protect critical sections with lock. func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) { diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 02fa6a1d4dc..8c94b35a217 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -610,14 +610,18 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { objectType: objectType, reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { updates++ + // Skip initial two fast path renews + if updates%2 == 1 && updates < 5 { + return true, nil, context.Canceled + } // Second update (first renew) should return our canceled error // FakeClient doesn't do anything with the context so we're doing this ourselves - if updates == 2 { + if updates == 4 { close(onRenewCalled) <-onRenewResume return true, nil, context.Canceled - } else if updates == 3 { + } else if updates == 5 { // We update the lock after the cancellation to release it // This wg is to avoid the data race on lockObj defer wg.Done() @@ -668,8 +672,12 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { objectType: objectType, reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { updates++ + // Always skip fast path renew + if updates%2 == 1 { + return true, nil, context.Canceled + } // Second update (first renew) should release the lock - if updates == 2 { + if updates == 4 { // We update the lock after the cancellation to release it // This wg is to avoid the data race on lockObj defer wg.Done() @@ -813,3 +821,156 @@ func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) { } } } + +func TestFastPathLeaderElection(t *testing.T) { + objectType := "leases" + var ( + lockObj runtime.Object + updates int + lockOps []string + cancelFunc func() + ) + resetVars := func() { + lockObj = nil + updates = 0 + lockOps = []string{} + cancelFunc = nil + } + lec := LeaderElectionConfig{ + LeaseDuration: 15 * time.Second, + RenewDeadline: 2 * time.Second, + RetryPeriod: 1 * time.Second, + + Callbacks: LeaderCallbacks{ + OnNewLeader: func(identity string) {}, + OnStoppedLeading: func() {}, + OnStartedLeading: func(context.Context) { + }, + }, + } + + tests := []struct { + name string + reactors []Reactor + expectedLockOps []string + }{ + { + name: "Exercise fast path after lock acquired", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "get") + if lockObj != nil { + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "create") + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + lockOps = append(lockOps, "update") + if updates == 2 { + cancelFunc() + } + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + expectedLockOps: []string{"get", "create", "update", "update"}, + }, + { + name: "Fallback to slow path after fast path fails", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "get") + if lockObj != nil { + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockOps = append(lockOps, "create") + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + lockOps = append(lockOps, "update") + switch updates { + case 2: + return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil) + case 4: + cancelFunc() + } + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"}, + }, + } + + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + resetVars() + + recorder := record.NewFakeRecorder(100) + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: recorder, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig) + if err != nil { + t.Fatal("resourcelock.New() = ", err) + } + + lec.Lock = lock + elector, err := NewLeaderElector(lec) + if err != nil { + t.Fatal("Failed to create leader elector: ", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancelFunc = cancel + + elector.Run(ctx) + assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps) + }) + } +}