Merge pull request #114872 from Iceber/fix_acquire_leader

client-go: fix the wait time for trying to acquire the leader lease

Kubernetes-commit: 6e213e7390b60c6db85210f322a213768cefb172
This commit is contained in:
Kubernetes Publisher 2023-02-24 07:17:34 -08:00
commit 6e69fba05a
2 changed files with 47 additions and 10 deletions

View File

@ -292,7 +292,7 @@ func (le *LeaderElector) release() bool {
if !le.IsLeader() { if !le.IsLeader() {
return true return true
} }
now := metav1.Now() now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{ leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions, LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1, LeaseDurationSeconds: 1,
@ -312,7 +312,7 @@ func (le *LeaderElector) release() bool {
// else it tries to renew the lease if it has already been acquired. Returns true // else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false. // on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now() now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{ leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(), HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
@ -344,7 +344,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
le.observedRawRecord = oldLeaderElectionRawRecord le.observedRawRecord = oldLeaderElectionRawRecord
} }
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && le.observedTime.Add(time.Second*time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).After(now.Time) &&
!le.IsLeader() { !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false return false

View File

@ -74,13 +74,15 @@ type Reactor struct {
} }
func testTryAcquireOrRenew(t *testing.T, objectType string) { func testTryAcquireOrRenew(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour) clock := clock.RealClock{}
past := time.Now().Add(-1000 * time.Hour) future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)
tests := []struct { tests := []struct {
name string name string
observedRecord rl.LeaderElectionRecord observedRecord rl.LeaderElectionRecord
observedTime time.Time observedTime time.Time
retryAfter time.Duration
reactors []Reactor reactors []Reactor
expectedEvents []string expectedEvents []string
@ -127,6 +129,33 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
transitionLeader: true, transitionLeader: true,
outHolder: "baz", outHolder: "baz",
}, },
{
name: "acquire from led object with the lease duration seconds",
reactors: []Reactor{
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
},
},
{
verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
},
},
{
verb: "update",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
retryAfter: 3 * time.Second,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{ {
name: "acquire from unled object", name: "acquire from unled object",
reactors: []Reactor{ reactors: []Reactor{
@ -283,11 +312,18 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedRecord: test.observedRecord, observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord, observedRawRecord: observedRawRecord,
observedTime: test.observedTime, observedTime: test.observedTime,
clock: clock.RealClock{}, clock: clock,
} }
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
if test.retryAfter != 0 {
time.Sleep(test.retryAfter)
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) { if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
} }
} else {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
}
le.observedRecord.AcquireTime = metav1.Time{} le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{} le.observedRecord.RenewTime = metav1.Time{}
@ -378,8 +414,9 @@ func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRec
} }
func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) { func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour) clock := clock.RealClock{}
past := time.Now().Add(-1000 * time.Hour) future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)
primaryType, secondaryType := multiLockType(t, objectType) primaryType, secondaryType := multiLockType(t, objectType)
tests := []struct { tests := []struct {
name string name string
@ -838,7 +875,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
observedRecord: test.observedRecord, observedRecord: test.observedRecord,
observedRawRecord: test.observedRawRecord, observedRawRecord: test.observedRawRecord,
observedTime: test.observedTime, observedTime: test.observedTime,
clock: clock.RealClock{}, clock: clock,
} }
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) { if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)