diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index 9691c8e0..6b5b0790 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -1019,58 +1019,21 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { onRelease = make(chan struct{}) lockObj runtime.Object + gets int updates int + wg sync.WaitGroup ) + resetVars := func() { + onNewLeader = make(chan struct{}) + onRenewCalled = make(chan struct{}) + onRenewResume = make(chan struct{}) + onRelease = make(chan struct{}) - resourceLockConfig := rl.ResourceLockConfig{ - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, + lockObj = nil + gets = 0 + updates = 0 } - c := &fake.Clientset{} - - c.AddReactor("get", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - if lockObj != nil { - return true, lockObj, nil - } - return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) - }) - - // create lock - c.AddReactor("create", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - lockObj = action.(fakeclient.CreateAction).GetObject() - return true, lockObj, nil - }) - - c.AddReactor("update", objectType, func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - updates++ - - // Second update (first renew) should return our canceled error - // FakeClient doesn't do anything with the context so we're doing this ourselves - if updates == 2 { - close(onRenewCalled) - <-onRenewResume - return true, nil, context.Canceled - } else if updates == 3 { - close(onRelease) - } - - lockObj = action.(fakeclient.UpdateAction).GetObject() - return true, lockObj, nil - - }) - - c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { - t.Errorf("unreachable action. testclient called too many times: %+v", action) - return true, nil, fmt.Errorf("unreachable action") - }) - - lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig) - if err != nil { - t.Fatal("resourcelock.New() = ", err) - } - lec := LeaderElectionConfig{ - Lock: lock, LeaseDuration: 15 * time.Second, RenewDeadline: 2 * time.Second, RetryPeriod: 1 * time.Second, @@ -1087,40 +1050,168 @@ func testReleaseOnCancellation(t *testing.T, objectType string) { }, } - elector, err := NewLeaderElector(lec) - if err != nil { - t.Fatal("Failed to create leader elector: ", err) + tests := []struct { + name string + reactors []Reactor + }{ + { + name: "release acquired lock on cancellation of update", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + gets++ + if lockObj != nil { + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + + // Second update (first renew) should return our canceled error + // FakeClient doesn't do anything with the context so we're doing this ourselves + if updates == 2 { + close(onRenewCalled) + <-onRenewResume + return true, nil, context.Canceled + } else if updates == 3 { + // We update the lock after the cancellation to release it + // This wg is to avoid the data race on lockObj + defer wg.Done() + close(onRelease) + } + + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + }, + { + name: "release acquired lock on cancellation of get", + reactors: []Reactor{ + { + verb: "get", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + gets++ + if lockObj != nil { + // Third and more get (first create, second renew) should return our canceled error + // FakeClient doesn't do anything with the context so we're doing this ourselves + if gets >= 3 { + close(onRenewCalled) + <-onRenewResume + return true, nil, context.Canceled + } + return true, lockObj, nil + } + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + lockObj = action.(fakeclient.CreateAction).GetObject() + return true, lockObj, nil + }, + }, + { + verb: "update", + objectType: objectType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + updates++ + // Second update (first renew) should release the lock + if updates == 2 { + // We update the lock after the cancellation to release it + // This wg is to avoid the data race on lockObj + defer wg.Done() + close(onRelease) + } + + lockObj = action.(fakeclient.UpdateAction).GetObject() + return true, lockObj, nil + }, + }, + }, + }, } - ctx, cancel := context.WithCancel(context.Background()) + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + wg.Add(1) + resetVars() - go elector.Run(ctx) + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig) + if err != nil { + t.Fatal("resourcelock.New() = ", err) + } - // Wait for us to become the leader - select { - case <-onNewLeader: - case <-time.After(10 * time.Second): - t.Fatal("failed to become the leader") - } + lec.Lock = lock + elector, err := NewLeaderElector(lec) + if err != nil { + t.Fatal("Failed to create leader elector: ", err) + } - // Wait for renew (update) to be invoked - select { - case <-onRenewCalled: - case <-time.After(10 * time.Second): - t.Fatal("the elector failed to renew the lock") - } + ctx, cancel := context.WithCancel(context.Background()) - // Cancel the context - stopping the elector while - // it's running - cancel() + go elector.Run(ctx) - // Resume the update call to return the cancellation - // which should trigger the release flow - close(onRenewResume) + // Wait for us to become the leader + select { + case <-onNewLeader: + case <-time.After(10 * time.Second): + t.Fatal("failed to become the leader") + } - select { - case <-onRelease: - case <-time.After(10 * time.Second): - t.Fatal("the lock was not released") + // Wait for renew (update) to be invoked + select { + case <-onRenewCalled: + case <-time.After(10 * time.Second): + t.Fatal("the elector failed to renew the lock") + } + + // Cancel the context - stopping the elector while + // it's running + cancel() + + // Resume the tryAcquireOrRenew call to return the cancellation + // which should trigger the release flow + close(onRenewResume) + + select { + case <-onRelease: + case <-time.After(10 * time.Second): + t.Fatal("the lock was not released") + } + wg.Wait() + }) } } diff --git a/tools/leaderelection/resourcelock/configmaplock.go b/tools/leaderelection/resourcelock/configmaplock.go index 57027289..e811fff0 100644 --- a/tools/leaderelection/resourcelock/configmaplock.go +++ b/tools/leaderelection/resourcelock/configmaplock.go @@ -44,11 +44,11 @@ type configMapLock struct { // Get returns the election record from a ConfigMap Annotation func (cml *configMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { var record LeaderElectionRecord - var err error - cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) + cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } + cml.cm = cm if cml.cm.Annotations == nil { cml.cm.Annotations = make(map[string]string) } diff --git a/tools/leaderelection/resourcelock/endpointslock.go b/tools/leaderelection/resourcelock/endpointslock.go index af3fa162..eb36d221 100644 --- a/tools/leaderelection/resourcelock/endpointslock.go +++ b/tools/leaderelection/resourcelock/endpointslock.go @@ -39,11 +39,11 @@ type endpointsLock struct { // Get returns the election record from a Endpoints Annotation func (el *endpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { var record LeaderElectionRecord - var err error - el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{}) + ep, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } + el.e = ep if el.e.Annotations == nil { el.e.Annotations = make(map[string]string) } diff --git a/tools/leaderelection/resourcelock/leaselock.go b/tools/leaderelection/resourcelock/leaselock.go index ab80d7f1..185ef0e5 100644 --- a/tools/leaderelection/resourcelock/leaselock.go +++ b/tools/leaderelection/resourcelock/leaselock.go @@ -39,11 +39,11 @@ type LeaseLock struct { // Get returns the election record from a Lease spec func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { - var err error - ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) + lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } + ll.lease = lease record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) recordByte, err := json.Marshal(*record) if err != nil {