diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index 3f6b898e..55f34fd3 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -56,6 +56,7 @@ import ( "bytes" "context" "fmt" + "sync" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -187,6 +188,9 @@ type LeaderElector struct { // clock is wrapper around time to allow for less flaky testing clock clock.Clock + // used to lock the observedRecord + observedRecordLock sync.Mutex + metrics leaderMetricsAdapter } @@ -224,13 +228,14 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { // GetLeader returns the identity of the last observed leader or returns the empty string if // no leader has yet been observed. +// This function is for informational purposes. (e.g. monitoring, logs, etc.) func (le *LeaderElector) GetLeader() string { - return le.observedRecord.HolderIdentity + return le.getObservedRecord().HolderIdentity } // IsLeader returns true if the last observed leader was this client else returns false. func (le *LeaderElector) IsLeader() bool { - return le.observedRecord.HolderIdentity == le.config.Lock.Identity() + return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity() } // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. @@ -301,8 +306,8 @@ func (le *LeaderElector) release() bool { klog.Errorf("Failed to release lock: %v", err) return false } - le.observedRecord = leaderElectionRecord - le.observedTime = le.clock.Now() + + le.setObservedRecord(&leaderElectionRecord) return true } @@ -329,16 +334,17 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { klog.Errorf("error initially creating leader election record: %v", err) return false } - le.observedRecord = leaderElectionRecord - le.observedTime = le.clock.Now() + + le.setObservedRecord(&leaderElectionRecord) + return true } // 2. Record obtained, check the Identity & Time if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { - le.observedRecord = *oldLeaderElectionRecord + le.setObservedRecord(oldLeaderElectionRecord) + le.observedRawRecord = oldLeaderElectionRawRecord - le.observedTime = le.clock.Now() } if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && @@ -362,8 +368,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { return false } - le.observedRecord = leaderElectionRecord - le.observedTime = le.clock.Now() + le.setObservedRecord(&leaderElectionRecord) return true } @@ -392,3 +397,22 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { return nil } + +// setObservedRecord will set a new observedRecord and update observedTime to the current time. +// Protect critical sections with lock. +func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) { + le.observedRecordLock.Lock() + defer le.observedRecordLock.Unlock() + + le.observedRecord = *observedRecord + le.observedTime = le.clock.Now() +} + +// getObservedRecord returns observersRecord. +// Protect critical sections with lock. +func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord { + le.observedRecordLock.Lock() + defer le.observedRecordLock.Unlock() + + return le.observedRecord +}