Merge pull request #80954 from zachomedia/fix-lock-release

Fix leader election lock release when using LeaseLocks
This commit is contained in:
Kubernetes Prow Robot 2020-10-26 20:57:58 -07:00 committed by GitHub
commit 68f6b09e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 160 additions and 1 deletions

View File

@ -290,8 +290,12 @@ func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)

View File

@ -917,3 +917,158 @@ func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}
func testReleaseLease(t *testing.T, objectType string) {
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reactors []Reactor
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "release acquired lock from no object",
reactors: []Reactor{
{
verb: "get",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
{
verb: "update",
objectType: objectType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
expectSuccess: true,
outHolder: "",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
switch objectType {
case "endpoints":
lock = &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
}
case "configmaps":
lock = &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
}
case "leases":
lock = &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
}
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if !le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
}
le.maybeReportTransition()
// Wait for a response to the leader transition, and add 1 so that we can track the final transition.
wg.Wait()
wg.Add(1)
if test.expectSuccess != le.release() {
t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
})
}
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseEndpoints(t *testing.T) {
testReleaseLease(t, "endpoints")
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseConfigMaps(t *testing.T) {
testReleaseLease(t, "configmaps")
}
// Will test leader election using endpoints as the resource
func TestReleaseLeaseLeases(t *testing.T) {
testReleaseLease(t, "leases")
}