Merge pull request #95939 from dprotaso/leaderelection-release

Address scenario where releasing a resource lock fails if a prior update fails or gets cancelled
This commit is contained in:
Kubernetes Prow Robot 2020-10-28 13:54:06 -07:00 committed by GitHub
commit 8bc8b11bcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 146 additions and 7 deletions

View File

@ -1072,3 +1072,129 @@ func TestReleaseLeaseConfigMaps(t *testing.T) {
func TestReleaseLeaseLeases(t *testing.T) {
testReleaseLease(t, "leases")
}
func TestReleaseOnCancellation_Endpoints(t *testing.T) {
testReleaseOnCancellation(t, "endpoints")
}
func TestReleaseOnCancellation_ConfigMaps(t *testing.T) {
testReleaseOnCancellation(t, "configmaps")
}
func TestReleaseOnCancellation_Leases(t *testing.T) {
testReleaseOnCancellation(t, "leases")
}
func testReleaseOnCancellation(t *testing.T, objectType string) {
var (
onNewLeader = make(chan struct{})
onRenewCalled = make(chan struct{})
onRenewResume = make(chan struct{})
onRelease = make(chan struct{})
lockObj runtime.Object
updates int
)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
c.AddReactor("get", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
if lockObj != nil {
return true, lockObj, nil
}
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
})
// create lock
c.AddReactor("create", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
lockObj = action.(fakeclient.CreateAction).GetObject()
return true, lockObj, nil
})
c.AddReactor("update", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
updates++
// Second update (first renew) should return our canceled error
// FakeClient doesn't do anything with the context so we're doing this ourselves
if updates == 2 {
close(onRenewCalled)
<-onRenewResume
return true, nil, context.Canceled
} else if updates == 3 {
close(onRelease)
}
lockObj = action.(fakeclient.UpdateAction).GetObject()
return true, lockObj, nil
})
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
if err != nil {
t.Fatal("resourcelock.New() = ", err)
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
// This is what we're testing
ReleaseOnCancel: true,
Callbacks: LeaderCallbacks{
OnNewLeader: func(identity string) {},
OnStoppedLeading: func() {},
OnStartedLeading: func(context.Context) {
close(onNewLeader)
},
},
}
elector, err := NewLeaderElector(lec)
if err != nil {
t.Fatal("Failed to create leader elector: ", err)
}
ctx, cancel := context.WithCancel(context.Background())
go elector.Run(ctx)
// Wait for us to become the leader
select {
case <-onNewLeader:
case <-time.After(10 * time.Second):
t.Fatal("failed to become the leader")
}
// Wait for renew (update) to be invoked
select {
case <-onRenewCalled:
case <-time.After(10 * time.Second):
t.Fatal("the elector failed to renew the lock")
}
// Cancel the context - stopping the elector while
// it's running
cancel()
// Resume the update call to return the cancellation
// which should trigger the release flow
close(onRenewResume)
select {
case <-onRelease:
case <-time.After(10 * time.Second):
t.Fatal("the lock was not released")
}
}

View File

@ -93,9 +93,13 @@ func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord)
cml.cm.Annotations = make(map[string]string)
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
if err != nil {
return err
}
cml.cm = cm
return nil
}
// RecordEvent in leader election while adding meta-data
func (cml *ConfigMapLock) RecordEvent(s string) {

View File

@ -88,9 +88,13 @@ func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) e
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
if err != nil {
return err
}
el.e = e
return nil
}
// RecordEvent in leader election while adding meta-data
func (el *EndpointsLock) RecordEvent(s string) {

View File

@ -71,11 +71,16 @@ func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error
return errors.New("lease not initialized, call get or create first")
}
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
if err != nil {
return err
}
ll.lease = lease
return nil
}
// RecordEvent in leader election while adding meta-data
func (ll *LeaseLock) RecordEvent(s string) {
if ll.LockConfig.EventRecorder == nil {