Merge pull request #122069 from linxiulei/le_opt0

Optimistically update leader lock
This commit is contained in:
Kubernetes Prow Robot 2023-12-14 05:10:46 +01:00 committed by GitHub
commit 583a79a456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 230 additions and 25 deletions

View File

@ -325,7 +325,22 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 1. fast path for the leader to update optimistically assuming that the record observed
// last time is the current version.
if le.IsLeader() && le.isLeaseValid(now.Time) {
oldObservedRecord := le.getObservedRecord()
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
err := le.config.Lock.Update(ctx, leaderElectionRecord)
if err == nil {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
}
// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
@ -342,24 +357,23 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
return true
}
// 2. Record obtained, check the Identity & Time
// 3. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
!le.IsLeader() {
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// 4. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
@ -400,6 +414,10 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
return nil
}
func (le *LeaderElector) isLeaseValid(now time.Time) bool {
return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
}
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
// Protect critical sections with lock.
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {

View File

@ -315,6 +315,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock,
metrics: globalMetricsFactory.newLeaderMetrics(),
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
if test.retryAfter != 0 {
@ -491,6 +492,7 @@ func testReleaseLease(t *testing.T, objectType string) {
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
metrics: globalMetricsFactory.newLeaderMetrics(),
}
if !le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
@ -610,14 +612,18 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Skip initial two fast path renews
if updates%2 == 1 && updates < 5 {
return true, nil, context.Canceled
}
// Second update (first renew) should return our canceled error
// FakeClient doesn't do anything with the context so we're doing this ourselves
if updates == 2 {
if updates == 4 {
close(onRenewCalled)
<-onRenewResume
return true, nil, context.Canceled
} else if updates == 3 {
} else if updates == 5 {
// We update the lock after the cancellation to release it
// This wg is to avoid the data race on lockObj
defer wg.Done()
@ -668,8 +674,12 @@ func testReleaseOnCancellation(t *testing.T, objectType string) {
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Always skip fast path renew
if updates%2 == 1 {
return true, nil, context.Canceled
}
// Second update (first renew) should release the lock
if updates == 2 {
if updates == 4 {
// We update the lock after the cancellation to release it
// This wg is to avoid the data race on lockObj
defer wg.Done()
@ -813,3 +823,156 @@ func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) {
}
}
}
func TestFastPathLeaderElection(t *testing.T) {
objectType := "leases"
var (
lockObj runtime.Object
updates int
lockOps []string
cancelFunc func()
)
resetVars := func() {
lockObj = nil
updates = 0
lockOps = []string{}
cancelFunc = nil
}
lec := LeaderElectionConfig{
LeaseDuration: 15 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(identity string) {},
OnStoppedLeading: func() {},
OnStartedLeading: func(context.Context) {
},
},
}
tests := []struct {
name string
reactors []Reactor
expectedLockOps []string
}{
{
name: "Exercise fast path after lock acquired",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "get")
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "create")
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
lockOps = append(lockOps, "update")
if updates == 2 {
cancelFunc()
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
},
},
},
expectedLockOps: []string{"get", "create", "update", "update"},
},
{
name: "Fallback to slow path after fast path fails",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "get")
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockOps = append(lockOps, "create")
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
lockOps = append(lockOps, "update")
switch updates {
case 2:
return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil)
case 4:
cancelFunc()
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
},
},
},
expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"},
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
resetVars()
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: recorder,
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
if err != nil {
t.Fatal("resourcelock.New() = ", err)
}
lec.Lock = lock
elector, err := NewLeaderElector(lec)
if err != nil {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
cancelFunc = cancel
elector.Run(ctx)
assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps)
})
}
}

View File

@ -26,24 +26,26 @@ import (
type leaderMetricsAdapter interface {
leaderOn(name string)
leaderOff(name string)
slowpathExercised(name string)
}
// GaugeMetric represents a single numerical value that can arbitrarily go up
// and down.
type SwitchMetric interface {
// LeaderMetric instruments metrics used in leader election.
type LeaderMetric interface {
On(name string)
Off(name string)
SlowpathExercised(name string)
}
type noopMetric struct{}
func (noopMetric) On(name string) {}
func (noopMetric) Off(name string) {}
func (noopMetric) SlowpathExercised(name string) {}
// defaultLeaderMetrics expects the caller to lock before setting any metrics.
type defaultLeaderMetrics struct {
// leader's value indicates if the current process is the owner of name lease
leader SwitchMetric
leader LeaderMetric
}
func (m *defaultLeaderMetrics) leaderOn(name string) {
@ -60,19 +62,27 @@ func (m *defaultLeaderMetrics) leaderOff(name string) {
m.leader.Off(name)
}
func (m *defaultLeaderMetrics) slowpathExercised(name string) {
if m == nil {
return
}
m.leader.SlowpathExercised(name)
}
type noMetrics struct{}
func (noMetrics) leaderOn(name string) {}
func (noMetrics) leaderOff(name string) {}
func (noMetrics) slowpathExercised(name string) {}
// MetricsProvider generates various metrics used by the leader election.
type MetricsProvider interface {
NewLeaderMetric() SwitchMetric
NewLeaderMetric() LeaderMetric
}
type noopMetricsProvider struct{}
func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric {
func (noopMetricsProvider) NewLeaderMetric() LeaderMetric {
return noopMetric{}
}

View File

@ -28,27 +28,41 @@ var (
StabilityLevel: k8smetrics.ALPHA,
Help: "Gauge of if the reporting system is master of the relevant lease, 0 indicates backup, 1 indicates master. 'name' is the string used to identify the lease. Please make sure to group by name.",
}, []string{"name"})
// A cumulative counter should be sufficient to get a rough ratio of slow path
// exercised given the leader election frequency is specified explicitly. So that
// to avoid the overhead to report a counter exercising fastpath.
leaderSlowpathCounter = k8smetrics.NewCounterVec(&k8smetrics.CounterOpts{
Name: "leader_election_slowpath_total",
StabilityLevel: k8smetrics.ALPHA,
Help: "Total number of slow path exercised in renewing leader leases. 'name' is the string used to identify the lease. Please make sure to group by name.",
}, []string{"name"})
)
func init() {
legacyregistry.MustRegister(leaderGauge)
legacyregistry.MustRegister(leaderSlowpathCounter)
leaderelection.SetProvider(prometheusMetricsProvider{})
}
type prometheusMetricsProvider struct{}
func (prometheusMetricsProvider) NewLeaderMetric() leaderelection.SwitchMetric {
return &switchAdapter{gauge: leaderGauge}
func (prometheusMetricsProvider) NewLeaderMetric() leaderelection.LeaderMetric {
return &leaderAdapter{gauge: leaderGauge, counter: leaderSlowpathCounter}
}
type switchAdapter struct {
type leaderAdapter struct {
gauge *k8smetrics.GaugeVec
counter *k8smetrics.CounterVec
}
func (s *switchAdapter) On(name string) {
func (s *leaderAdapter) On(name string) {
s.gauge.WithLabelValues(name).Set(1.0)
}
func (s *switchAdapter) Off(name string) {
func (s *leaderAdapter) Off(name string) {
s.gauge.WithLabelValues(name).Set(0.0)
}
func (s *leaderAdapter) SlowpathExercised(name string) {
s.counter.WithLabelValues(name).Inc()
}