leaderelection: optimistically update leader lock

Signed-off-by: Eric Lin <exlin@google.com>
This commit is contained in:
Eric Lin 2023-11-25 22:04:32 +00:00
parent d61cbac69a
commit 1d9f7fd516
2 changed files with 188 additions and 9 deletions

View File

@ -325,7 +325,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 1. fast path for the leader to update optimistically assuming that the record observed
// last time is the current version.
if le.IsLeader() && le.isLeaseValid(now.Time) {
oldObservedRecord := le.getObservedRecord()
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
err := le.config.Lock.Update(ctx, leaderElectionRecord)
if err == nil {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
}
// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
@ -342,24 +357,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
return true
}
// 2. Record obtained, check the Identity & Time
// 3. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
!le.IsLeader() {
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// 4. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
@ -400,6 +414,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
return nil
}
func (le *LeaderElector) isLeaseValid(now time.Time) bool {
return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
}
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
// Protect critical sections with lock.
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {

View File

@ -610,14 +610,18 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Skip initial two fast path renews
if updates%2 == 1 && updates < 5 {
return true, nil, context.Canceled
}
// Second update (first renew) should return our canceled error
// FakeClient doesn't do anything with the context so we're doing this ourselves
if updates == 2 {
if updates == 4 {
close(onRenewCalled)
<-onRenewResume
return true, nil, context.Canceled
} else if updates == 3 {
} else if updates == 5 {
// We update the lock after the cancellation to release it
// This wg is to avoid the data race on lockObj
defer wg.Done()
@ -668,8 +672,12 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Always skip fast path renew
if updates%2 == 1 {
return true, nil, context.Canceled
}
// Second update (first renew) should release the lock
if updates == 2 {
if updates == 4 {
// We update the lock after the cancellation to release it
// This wg is to avoid the data race on lockObj
defer wg.Done()
@ -813,3 +821,156 @@ func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) {
}
}
}
func TestFastPathLeaderElection(t *testing.T) {
objectType := "leases"
var (
lockObj runtime.Object
updates int
lockOps []string
cancelFunc func()
)
resetVars := func() {
lockObj = nil
updates = 0
lockOps = []string{}
cancelFunc = nil
}
lec := LeaderElectionConfig{
LeaseDuration: 15 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(identity string) {},
OnStoppedLeading: func() {},
OnStartedLeading: func(context.Context) {
},
},
}
tests := []struct {
name string
reactors []Reactor
expectedLockOps []string
}{
{
name: "Exercise fast path after lock acquired",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "get")
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "create")
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
lockOps = append(lockOps, "update")
if updates == 2 {
cancelFunc()
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
},
},
},
expectedLockOps: []string{"get", "create", "update", "update"},
},
{
name: "Fallback to slow path after fast path fails",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "get")
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "create")
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
lockOps = append(lockOps, "update")
switch updates {
case 2:
return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil)
case 4:
cancelFunc()
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
},
},
},
expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"},
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
resetVars()
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: recorder,
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
if err != nil {
t.Fatal("resourcelock.New() = ", err)
}
lec.Lock = lock
elector, err := NewLeaderElector(lec)
if err != nil {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
cancelFunc = cancel
elector.Run(ctx)
assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps)
})
}
}