Pass context to tryAcquireOrRenew

Kubernetes-commit: c049f30ef2a83172f46a587ddaf2104b39df8301
This commit is contained in:
Ted Yu 2020-02-10 12:31:24 -08:00 committed by Kubernetes Publisher
parent 200280e336
commit d6bfb32c17
8 changed files with 48 additions and 55 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package leaderelection
import (
"context"
"fmt"
"testing"
"time"
@ -32,17 +33,17 @@ type fakeLock struct {
}
// Get is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
func (fl *fakeLock) Get(ctx context.Context) (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
return nil, nil, nil
}
// Create is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Create(ler rl.LeaderElectionRecord) error {
func (fl *fakeLock) Create(ctx context.Context, ler rl.LeaderElectionRecord) error {
return nil
}
// Update is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Update(ler rl.LeaderElectionRecord) error {
func (fl *fakeLock) Update(ctx context.Context, ler rl.LeaderElectionRecord) error {
return nil
}

View File

@ -241,7 +241,7 @@ func (le *LeaderElector) acquire(ctx context.Context) bool {
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
succeeded = le.tryAcquireOrRenew()
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
@ -263,18 +263,7 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()
select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
le.maybeReportTransition()
@ -303,7 +292,7 @@ func (le *LeaderElector) release() bool {
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
}
if err := le.config.Lock.Update(leaderElectionRecord); err != nil {
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
@ -315,7 +304,7 @@ func (le *LeaderElector) release() bool {
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
@ -325,13 +314,13 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
@ -363,7 +352,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package leaderelection
import (
"context"
"encoding/json"
"fmt"
"sync"
@ -296,7 +297,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
@ -879,7 +880,7 @@ func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}

View File

@ -42,10 +42,10 @@ type ConfigMapLock struct {
}
// Get returns the election record from a ConfigMap Annotation
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
func (cml *ConfigMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
var err error
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(context.TODO(), cml.ConfigMapMeta.Name, metav1.GetOptions{})
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@ -62,12 +62,12 @@ func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a LeaderElectionRecord annotation
func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(context.TODO(), &v1.ConfigMap{
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cml.ConfigMapMeta.Name,
Namespace: cml.ConfigMapMeta.Namespace,
@ -80,7 +80,7 @@ func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error {
}
// Update will update an existing annotation on a given resource.
func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
func (cml *ConfigMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if cml.cm == nil {
return errors.New("configmap not initialized, call get or create first")
}
@ -89,7 +89,7 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
return err
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(context.TODO(), cml.cm, metav1.UpdateOptions{})
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
return err
}

View File

@ -37,10 +37,10 @@ type EndpointsLock struct {
}
// Get returns the election record from a Endpoints Annotation
func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
func (el *EndpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(context.TODO(), el.EndpointsMeta.Name, metav1.GetOptions{})
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@ -57,12 +57,12 @@ func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
func (el *EndpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(context.TODO(), &v1.Endpoints{
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
@ -75,7 +75,7 @@ func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
}
// Update will update and existing annotation on a given resource.
func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
func (el *EndpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if el.e == nil {
return errors.New("endpoint not initialized, call get or create first")
}
@ -87,7 +87,7 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(context.TODO(), el.e, metav1.UpdateOptions{})
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
return err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package resourcelock
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -73,13 +74,13 @@ type ResourceLockConfig struct {
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get() (*LeaderElectionRecord, []byte, error)
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord
Create(ler LeaderElectionRecord) error
Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ler LeaderElectionRecord) error
Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events
RecordEvent(string)

View File

@ -38,9 +38,9 @@ type LeaseLock struct {
}
// Get returns the election record from a Lease spec
func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(context.TODO(), ll.LeaseMeta.Name, metav1.GetOptions{})
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
@ -53,9 +53,9 @@ func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create a Lease
func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(context.TODO(), &coordinationv1.Lease{
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: ll.LeaseMeta.Name,
Namespace: ll.LeaseMeta.Namespace,
@ -66,13 +66,13 @@ func (ll *LeaseLock) Create(ler LeaderElectionRecord) error {
}
// Update will update an existing Lease spec.
func (ll *LeaseLock) Update(ler LeaderElectionRecord) error {
func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if ll.lease == nil {
return errors.New("lease not initialized, call get or create first")
}
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(context.TODO(), ll.lease, metav1.UpdateOptions{})
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
return err
}

View File

@ -18,6 +18,7 @@ package resourcelock
import (
"bytes"
"context"
"encoding/json"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -34,13 +35,13 @@ type MultiLock struct {
}
// Get returns the older election record of the lock
func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get()
func (ml *MultiLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get(ctx)
if err != nil {
return nil, nil, err
}
secondary, secondaryRaw, err := ml.Secondary.Get()
secondary, secondaryRaw, err := ml.Secondary.Get(ctx)
if err != nil {
// Lock is held by old client
if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
@ -60,25 +61,25 @@ func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
}
// Create attempts to create both primary lock and secondary lock
func (ml *MultiLock) Create(ler LeaderElectionRecord) error {
err := ml.Primary.Create(ler)
func (ml *MultiLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
err := ml.Primary.Create(ctx, ler)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return ml.Secondary.Create(ler)
return ml.Secondary.Create(ctx, ler)
}
// Update will update and existing annotation on both two resources.
func (ml *MultiLock) Update(ler LeaderElectionRecord) error {
err := ml.Primary.Update(ler)
func (ml *MultiLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
err := ml.Primary.Update(ctx, ler)
if err != nil {
return err
}
_, _, err = ml.Secondary.Get()
_, _, err = ml.Secondary.Get(ctx)
if err != nil && apierrors.IsNotFound(err) {
return ml.Secondary.Create(ler)
return ml.Secondary.Create(ctx, ler)
}
return ml.Secondary.Update(ler)
return ml.Secondary.Update(ctx, ler)
}
// RecordEvent in leader election while adding meta-data