mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-27 07:28:14 +00:00
Fix data race for leaderelection package
Kubernetes-commit: 8eda5521c041bbcd5ad19b3cfff49202e7eb5d22
This commit is contained in:
parent
c36b96a74b
commit
b4027a90c4
@ -56,6 +56,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -187,6 +188,9 @@ type LeaderElector struct {
|
|||||||
// clock is wrapper around time to allow for less flaky testing
|
// clock is wrapper around time to allow for less flaky testing
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
|
|
||||||
|
// used to lock the observedRecord
|
||||||
|
observedRecordLock sync.Mutex
|
||||||
|
|
||||||
metrics leaderMetricsAdapter
|
metrics leaderMetricsAdapter
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,13 +228,14 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
|
|||||||
|
|
||||||
// GetLeader returns the identity of the last observed leader or returns the empty string if
|
// GetLeader returns the identity of the last observed leader or returns the empty string if
|
||||||
// no leader has yet been observed.
|
// no leader has yet been observed.
|
||||||
|
// This function is for informational purposes. (e.g. monitoring, logs, etc.)
|
||||||
func (le *LeaderElector) GetLeader() string {
|
func (le *LeaderElector) GetLeader() string {
|
||||||
return le.observedRecord.HolderIdentity
|
return le.getObservedRecord().HolderIdentity
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsLeader returns true if the last observed leader was this client else returns false.
|
// IsLeader returns true if the last observed leader was this client else returns false.
|
||||||
func (le *LeaderElector) IsLeader() bool {
|
func (le *LeaderElector) IsLeader() bool {
|
||||||
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
|
return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
|
||||||
}
|
}
|
||||||
|
|
||||||
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
|
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
|
||||||
@ -301,8 +306,8 @@ func (le *LeaderElector) release() bool {
|
|||||||
klog.Errorf("Failed to release lock: %v", err)
|
klog.Errorf("Failed to release lock: %v", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
le.observedRecord = leaderElectionRecord
|
|
||||||
le.observedTime = le.clock.Now()
|
le.setObservedRecord(&leaderElectionRecord)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -329,16 +334,17 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
|
|||||||
klog.Errorf("error initially creating leader election record: %v", err)
|
klog.Errorf("error initially creating leader election record: %v", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
le.observedRecord = leaderElectionRecord
|
|
||||||
le.observedTime = le.clock.Now()
|
le.setObservedRecord(&leaderElectionRecord)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Record obtained, check the Identity & Time
|
// 2. Record obtained, check the Identity & Time
|
||||||
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
|
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
|
||||||
le.observedRecord = *oldLeaderElectionRecord
|
le.setObservedRecord(oldLeaderElectionRecord)
|
||||||
|
|
||||||
le.observedRawRecord = oldLeaderElectionRawRecord
|
le.observedRawRecord = oldLeaderElectionRawRecord
|
||||||
le.observedTime = le.clock.Now()
|
|
||||||
}
|
}
|
||||||
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
|
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
|
||||||
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
||||||
@ -362,8 +368,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
le.observedRecord = leaderElectionRecord
|
le.setObservedRecord(&leaderElectionRecord)
|
||||||
le.observedTime = le.clock.Now()
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -392,3 +397,22 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
|
||||||
|
// Protect critical sections with lock.
|
||||||
|
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
|
||||||
|
le.observedRecordLock.Lock()
|
||||||
|
defer le.observedRecordLock.Unlock()
|
||||||
|
|
||||||
|
le.observedRecord = *observedRecord
|
||||||
|
le.observedTime = le.clock.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// getObservedRecord returns observersRecord.
|
||||||
|
// Protect critical sections with lock.
|
||||||
|
func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord {
|
||||||
|
le.observedRecordLock.Lock()
|
||||||
|
defer le.observedRecordLock.Unlock()
|
||||||
|
|
||||||
|
return le.observedRecord
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user