From d04c2ced1cc5a27f7c980eff65b5dfd20e73ce24 Mon Sep 17 00:00:00 2001 From: jackzhang Date: Mon, 29 Aug 2022 09:58:13 +0800 Subject: [PATCH] update lock getter of leaderelection The lock acquired by tryAcquireOrRenew is released when the leader ends leadership. However, due to the cancellation of the context, the lock may be set as an empty lock, so the Update cannot be run normally, resulting in a failure to release the lock. Signed-off-by: jackzhang Kubernetes-commit: 8690ff6264cceb38bd81dec99bb8affcc40286a9 --- tools/leaderelection/leaderelection_test.go | 239 ++++++++++++------ .../resourcelock/configmaplock.go | 4 +- .../resourcelock/endpointslock.go | 4 +- .../leaderelection/resourcelock/leaselock.go | 4 +- 4 files changed, 171 insertions(+), 80 deletions(-) diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index 9691c8e0..6b5b0790 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -1019,58 +1019,21 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { onRelease = make(chan struct{}) lockObj runtime.Object + gets int updates int + wg sync.WaitGroup ) + resetVars := func() { + onNewLeader = make(chan struct{}) + onRenewCalled = make(chan struct{}) + onRenewResume = make(chan struct{}) + onRelease = make(chan struct{}) - resourceLockConfig := rl.ResourceLockConfig{ - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, + lockObj = nil + gets = 0 + updates = 0 } - c := &fake.Clientset{} - - c.AddReactor("get", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - if lockObj != nil { - return true, lockObj, nil - } - return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) - }) - - // create lock - c.AddReactor("create", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - lockObj = action.(fakeclient.CreateAction).GetObject() - return true, lockObj, nil - }) - - c.AddReactor("update", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - updates++ - - // Second update (first renew) should return our canceled error - // FakeClient doesn't do anything with the context so we're doing this ourselves - if updates == 2 { - close(onRenewCalled) - <-onRenewResume - return true, nil, context.Canceled - } else if updates == 3 { - close(onRelease) - } - - lockObj = action.(fakeclient.UpdateAction).GetObject() - return true, lockObj, nil - - }) - - c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { - t.Errorf("unreachable action. testclient called too many times: %+v", action) - return true, nil, fmt.Errorf("unreachable action") - }) - - lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig) - if err != nil { - t.Fatal("resourcelock.New() = ", err) - } - lec := LeaderElectionConfig{ - Lock: lock, LeaseDuration: 15 * time.Second, RenewDeadline: 2 * time.Second, RetryPeriod: 1 * time.Second, @@ -1087,40 +1050,168 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { }, } - elector, err := NewLeaderElector(lec) - if err != nil { - t.Fatal("Failed to create leader elector: ", err) + tests := []struct { + name string + reactors []Reactor + }{ + { + name: "release acquired lock on cancellation of update", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + gets++ + if lockObj != nil { + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + + // Second update (first renew) should return our canceled error + // FakeClient doesn't do anything with the context so we're doing this ourselves + if updates == 2 { + close(onRenewCalled) + <-onRenewResume + return true, nil, context.Canceled + } else if updates == 3 { + // We update the lock after the cancellation to release it + // This wg is to avoid the data race on lockObj + defer wg.Done() + close(onRelease) + } + + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + }, + { + name: "release acquired lock on cancellation of get", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + gets++ + if lockObj != nil { + // Third and more get (first create, second renew) should return our canceled error + // FakeClient doesn't do anything with the context so we're doing this ourselves + if gets >= 3 { + close(onRenewCalled) + <-onRenewResume + return true, nil, context.Canceled + } + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + // Second update (first renew) should release the lock + if updates == 2 { + // We update the lock after the cancellation to release it + // This wg is to avoid the data race on lockObj + defer wg.Done() + close(onRelease) + } + + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + }, } - ctx, cancel := context.WithCancel(context.Background()) + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + wg.Add(1) + resetVars() - go elector.Run(ctx) + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig) + if err != nil { + t.Fatal("resourcelock.New() = ", err) + } - // Wait for us to become the leader - select { - case <-onNewLeader: - case <-time.After(10 * time.Second): - t.Fatal("failed to become the leader") - } + lec.Lock = lock + elector, err := NewLeaderElector(lec) + if err != nil { + t.Fatal("Failed to create leader elector: ", err) + } - // Wait for renew (update) to be invoked - select { - case <-onRenewCalled: - case <-time.After(10 * time.Second): - t.Fatal("the elector failed to renew the lock") - } + ctx, cancel := context.WithCancel(context.Background()) - // Cancel the context - stopping the elector while - // it's running - cancel() + go elector.Run(ctx) - // Resume the update call to return the cancellation - // which should trigger the release flow - close(onRenewResume) + // Wait for us to become the leader + select { + case <-onNewLeader: + case <-time.After(10 * time.Second): + t.Fatal("failed to become the leader") + } - select { - case <-onRelease: - case <-time.After(10 * time.Second): - t.Fatal("the lock was not released") + // Wait for renew (update) to be invoked + select { + case <-onRenewCalled: + case <-time.After(10 * time.Second): + t.Fatal("the elector failed to renew the lock") + } + + // Cancel the context - stopping the elector while + // it's running + cancel() + + // Resume the tryAcquireOrRenew call to return the cancellation + // which should trigger the release flow + close(onRenewResume) + + select { + case <-onRelease: + case <-time.After(10 * time.Second): + t.Fatal("the lock was not released") + } + wg.Wait() + }) } } diff --git a/tools/leaderelection/resourcelock/configmaplock.go b/tools/leaderelection/resourcelock/configmaplock.go index 57027289..e811fff0 100644 --- a/tools/leaderelection/resourcelock/configmaplock.go +++ b/tools/leaderelection/resourcelock/configmaplock.go @@ -44,11 +44,11 @@ type configMapLock struct { // Get returns the election record from a ConfigMap Annotation func (cml *configMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { var record LeaderElectionRecord - var err error - cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) + cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } + cml.cm = cm if cml.cm.Annotations == nil { cml.cm.Annotations = make(map[string]string) } diff --git a/tools/leaderelection/resourcelock/endpointslock.go b/tools/leaderelection/resourcelock/endpointslock.go index af3fa162..eb36d221 100644 --- a/tools/leaderelection/resourcelock/endpointslock.go +++ b/tools/leaderelection/resourcelock/endpointslock.go @@ -39,11 +39,11 @@ type endpointsLock struct { // Get returns the election record from a Endpoints Annotation func (el *endpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { var record LeaderElectionRecord - var err error - el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{}) + ep, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } + el.e = ep if el.e.Annotations == nil { el.e.Annotations = make(map[string]string) } diff --git a/tools/leaderelection/resourcelock/leaselock.go b/tools/leaderelection/resourcelock/leaselock.go index ab80d7f1..185ef0e5 100644 --- a/tools/leaderelection/resourcelock/leaselock.go +++ b/tools/leaderelection/resourcelock/leaselock.go @@ -39,11 +39,11 @@ type LeaseLock struct { // Get returns the election record from a Lease spec func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { - var err error - ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) + lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } + ll.lease = lease record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) recordByte, err := json.Marshal(*record) if err != nil {