leaderelection: optimistically update leader lock

Signed-off-by: Eric Lin <exlin@google.com>

Kubernetes-commit: 1d9f7fd516b4787f5ef32692711d5ae3031e794e
This commit is contained in:
Eric Lin
2023-11-25 22:04:32 +00:00
committed by Kubernetes Publisher
parent feecac4b44
commit 3c7c00d2d6
2 changed files with 188 additions and 9 deletions

View File

@@ -325,7 +325,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 1. fast path for the leader to update optimistically assuming that the record observed
// last time is the current version.
if le.IsLeader() && le.isLeaseValid(now.Time) {
oldObservedRecord := le.getObservedRecord()
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
err := le.config.Lock.Update(ctx, leaderElectionRecord)
if err == nil {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
}
// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
@@ -342,24 +357,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
return true
}
// 2. Record obtained, check the Identity & Time
// 3. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
!le.IsLeader() {
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// 4. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
@@ -400,6 +414,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
return nil
}
func (le *LeaderElector) isLeaseValid(now time.Time) bool {
return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
}
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
// Protect critical sections with lock.
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {