migrate leader election to lease API

Change-Id: I21fd5cdc1af59e456628cf15fc84b2d79db2eda0

Kubernetes-commit: 447295aff2bd8cdc92a2376553d83546a4d6eb41
This commit is contained in:
chenyixiang 2019-08-07 01:48:32 +08:00 committed by Kubernetes Publisher
parent 07054768d9
commit ba49d2a180
8 changed files with 761 additions and 88 deletions

View File

@ -21,9 +21,10 @@ import (
"testing" "testing"
"time" "time"
"net/http"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
rl "k8s.io/client-go/tools/leaderelection/resourcelock" rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"net/http"
) )
type fakeLock struct { type fakeLock struct {
@ -31,8 +32,8 @@ type fakeLock struct {
} }
// Get is a dummy to allow us to have a fakeLock for testing. // Get is a dummy to allow us to have a fakeLock for testing.
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, err error) { func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) {
return nil, nil return nil, nil, nil
} }
// Create is a dummy to allow us to have a fakeLock for testing. // Create is a dummy to allow us to have a fakeLock for testing.

View File

@ -53,9 +53,9 @@ limitations under the License.
package leaderelection package leaderelection
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"reflect"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
@ -177,6 +177,7 @@ type LeaderElector struct {
config LeaderElectionConfig config LeaderElectionConfig
// internal bookkeeping // internal bookkeeping
observedRecord rl.LeaderElectionRecord observedRecord rl.LeaderElectionRecord
observedRawRecord []byte
observedTime time.Time observedTime time.Time
// used to implement OnNewLeader(), may lag slightly from the // used to implement OnNewLeader(), may lag slightly from the
// value observedRecord.HolderIdentity if the transition has // value observedRecord.HolderIdentity if the transition has
@ -324,7 +325,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
} }
// 1. obtain or create the ElectionRecord // 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, err := le.config.Lock.Get() oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get()
if err != nil { if err != nil {
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
@ -340,8 +341,9 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
} }
// 2. Record obtained, check the Identity & Time // 2. Record obtained, check the Identity & Time
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { if bytes.Compare(le.observedRawRecord, oldLeaderElectionRawRecord) != 0 {
le.observedRecord = *oldLeaderElectionRecord le.observedRecord = *oldLeaderElectionRecord
le.observedRawRecord = oldLeaderElectionRawRecord
le.observedTime = le.clock.Now() le.observedTime = le.clock.Now()
} }
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
@ -365,6 +367,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
klog.Errorf("Failed to update lock: %v", err) klog.Errorf("Failed to update lock: %v", err)
return false return false
} }
le.observedRecord = leaderElectionRecord le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now() le.observedTime = le.clock.Now()
return true return true

View File

@ -37,7 +37,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
) )
func createLockObject(objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) { func createLockObject(t *testing.T, objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) {
objectMeta := metav1.ObjectMeta{ objectMeta := metav1.ObjectMeta{
Namespace: namespace, Namespace: namespace,
Name: name, Name: name,
@ -59,7 +59,7 @@ func createLockObject(objectType, namespace, name string, record rl.LeaderElecti
spec := rl.LeaderElectionRecordToLeaseSpec(&record) spec := rl.LeaderElectionRecordToLeaseSpec(&record)
obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec} obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec}
default: default:
panic("unexpected objType:" + objectType) t.Fatal("unexpected objType:" + objectType)
} }
return return
} }
@ -69,6 +69,12 @@ func TestTryAcquireOrRenewEndpoints(t *testing.T) {
testTryAcquireOrRenew(t, "endpoints") testTryAcquireOrRenew(t, "endpoints")
} }
type Reactor struct {
verb string
objectType string
reaction fakeclient.ReactionFunc
}
func testTryAcquireOrRenew(t *testing.T, objectType string) { func testTryAcquireOrRenew(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour) future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour) past := time.Now().Add(-1000 * time.Hour)
@ -77,10 +83,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
name string name string
observedRecord rl.LeaderElectionRecord observedRecord rl.LeaderElectionRecord
observedTime time.Time observedTime time.Time
reactors []struct { reactors []Reactor
verb string
reaction fakeclient.ReactionFunc
}
expectSuccess bool expectSuccess bool
transitionLeader bool transitionLeader bool
@ -88,10 +91,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}{ }{
{ {
name: "acquire from no object", name: "acquire from no object",
reactors: []struct { reactors: []Reactor{
verb string
reaction fakeclient.ReactionFunc
}{
{ {
verb: "get", verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
@ -110,14 +110,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}, },
{ {
name: "acquire from unled object", name: "acquire from unled object",
reactors: []struct { reactors: []Reactor{
verb string
reaction fakeclient.ReactionFunc
}{
{ {
verb: "get", verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
}, },
}, },
{ {
@ -134,14 +131,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}, },
{ {
name: "acquire from led, unacked object", name: "acquire from led, unacked object",
reactors: []struct { reactors: []Reactor{
verb string
reaction fakeclient.ReactionFunc
}{
{ {
verb: "get", verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
}, },
}, },
{ {
@ -160,14 +154,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}, },
{ {
name: "acquire from empty led, acked object", name: "acquire from empty led, acked object",
reactors: []struct { reactors: []Reactor{
verb string
reaction fakeclient.ReactionFunc
}{
{ {
verb: "get", verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil
}, },
}, },
{ {
@ -185,14 +176,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}, },
{ {
name: "don't acquire from led, acked object", name: "don't acquire from led, acked object",
reactors: []struct { reactors: []Reactor{
verb string
reaction fakeclient.ReactionFunc
}{
{ {
verb: "get", verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
}, },
}, },
}, },
@ -203,14 +191,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}, },
{ {
name: "renew already acquired object", name: "renew already acquired object",
reactors: []struct { reactors: []Reactor{
verb string
reaction fakeclient.ReactionFunc
}{
{ {
verb: "get", verb: "get",
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
}, },
}, },
{ {
@ -282,13 +267,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
}, },
}, },
} }
observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
le := &LeaderElector{ le := &LeaderElector{
config: lec, config: lec,
observedRecord: test.observedRecord, observedRecord: test.observedRecord,
observedRawRecord: observedRawRecord,
observedTime: test.observedTime, observedTime: test.observedTime,
clock: clock.RealClock{}, clock: clock.RealClock{},
} }
if test.expectSuccess != le.tryAcquireOrRenew() { if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
} }
@ -352,3 +338,560 @@ func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord)) t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord))
} }
} }
func multiLockType(t *testing.T, objectType string) (primaryType, secondaryType string) {
switch objectType {
case rl.EndpointsLeasesResourceLock:
return rl.EndpointsResourceLock, rl.LeasesResourceLock
case rl.ConfigMapsLeasesResourceLock:
return rl.ConfigMapsResourceLock, rl.LeasesResourceLock
default:
t.Fatal("unexpected objType:" + objectType)
}
return
}
func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
var err error
switch objectType {
case "endpoints", "configmaps", "leases":
ret, err = json.Marshal(ler)
if err != nil {
t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
}
case "endpointsleases", "configmapsleases":
recordBytes, err := json.Marshal(ler)
if err != nil {
t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
}
ret = rl.ConcatRawRecord(recordBytes, recordBytes)
default:
t.Fatal("unexpected objType:" + objectType)
}
return
}
func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
future := time.Now().Add(1000 * time.Hour)
past := time.Now().Add(-1000 * time.Hour)
primaryType, secondaryType := multiLockType(t, objectType)
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedRawRecord []byte
observedTime time.Time
reactors []Reactor
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "acquire from no object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
expectSuccess: true,
outHolder: "baz",
},
{
name: "acquire from unled old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from unled transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led, unack old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led, unack transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from conflict led, ack transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: rl.UnknownLeader,
},
{
name: "acquire from led, unack unknown object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "don't acquire from led, ack old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "don't acquire from led, acked new object, observe new record",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, secondaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: rl.UnknownLeader,
},
{
name: "don't acquire from led, acked new object, observe transition record",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "renew already required object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "baz"}),
observedTime: future,
expectSuccess: true,
outHolder: "baz",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, reactor.objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
switch objectType {
case rl.EndpointsLeasesResourceLock:
lock = &rl.MultiLock{
Primary: &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
},
Secondary: &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
},
}
case rl.ConfigMapsLeasesResourceLock:
lock = &rl.MultiLock{
Primary: &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoreV1(),
},
Secondary: &rl.LeaseLock{
LeaseMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c.CoordinationV1(),
},
}
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: test.observedRawRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
})
}
}
// Will test leader election using endpointsleases as the resource
func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "endpointsleases")
}
// Will test leader election using configmapsleases as the resource
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}

View File

@ -41,22 +41,23 @@ type ConfigMapLock struct {
} }
// Get returns the election record from a ConfigMap Annotation // Get returns the election record from a ConfigMap Annotation
func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, error) { func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord var record LeaderElectionRecord
var err error var err error
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{}) cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if cml.cm.Annotations == nil { if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string) cml.cm.Annotations = make(map[string]string)
} }
if recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]; found { recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]
if found {
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
return nil, err return nil, nil, err
} }
} }
return &record, nil return &record, []byte(recordBytes), nil
} }
// Create attempts to create a LeaderElectionRecord annotation // Create attempts to create a LeaderElectionRecord annotation
@ -106,7 +107,7 @@ func (cml *ConfigMapLock) Describe() string {
return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name)
} }
// returns the Identity of the lock // Identity returns the Identity of the lock
func (cml *ConfigMapLock) Identity() string { func (cml *ConfigMapLock) Identity() string {
return cml.LockConfig.Identity return cml.LockConfig.Identity
} }

View File

@ -36,22 +36,23 @@ type EndpointsLock struct {
} }
// Get returns the election record from a Endpoints Annotation // Get returns the election record from a Endpoints Annotation
func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) { func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord var record LeaderElectionRecord
var err error var err error
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{}) el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if el.e.Annotations == nil { if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string) el.e.Annotations = make(map[string]string)
} }
if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found { recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]
if found {
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
return nil, err return nil, nil, err
} }
} }
return &record, nil return &record, []byte(recordBytes), nil
} }
// Create attempts to create a LeaderElectionRecord annotation // Create attempts to create a LeaderElectionRecord annotation
@ -101,7 +102,7 @@ func (el *EndpointsLock) Describe() string {
return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name)
} }
// returns the Identity of the lock // Identity returns the Identity of the lock
func (el *EndpointsLock) Identity() string { func (el *EndpointsLock) Identity() string {
return el.LockConfig.Identity return el.LockConfig.Identity
} }

View File

@ -30,6 +30,8 @@ const (
EndpointsResourceLock = "endpoints" EndpointsResourceLock = "endpoints"
ConfigMapsResourceLock = "configmaps" ConfigMapsResourceLock = "configmaps"
LeasesResourceLock = "leases" LeasesResourceLock = "leases"
EndpointsLeasesResourceLock = "endpointsleases"
ConfigMapsLeasesResourceLock = "configmapsleases"
) )
// LeaderElectionRecord is the record that is stored in the leader election annotation. // LeaderElectionRecord is the record that is stored in the leader election annotation.
@ -71,7 +73,7 @@ type ResourceLockConfig struct {
// by the leaderelection code. // by the leaderelection code.
type Interface interface { type Interface interface {
// Get returns the LeaderElectionRecord // Get returns the LeaderElectionRecord
Get() (*LeaderElectionRecord, error) Get() (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord // Create attempts to create a LeaderElectionRecord
Create(ler LeaderElectionRecord) error Create(ler LeaderElectionRecord) error
@ -92,33 +94,46 @@ type Interface interface {
// Manufacture will create a lock of a given type according to the input parameters // Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
switch lockType { endpointsLock := &EndpointsLock{
case EndpointsResourceLock:
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{ EndpointsMeta: metav1.ObjectMeta{
Namespace: ns, Namespace: ns,
Name: name, Name: name,
}, },
Client: coreClient, Client: coreClient,
LockConfig: rlc, LockConfig: rlc,
}, nil }
case ConfigMapsResourceLock: configmapLock := &ConfigMapLock{
return &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{ ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns, Namespace: ns,
Name: name, Name: name,
}, },
Client: coreClient, Client: coreClient,
LockConfig: rlc, LockConfig: rlc,
}, nil }
case LeasesResourceLock: leaseLock := &LeaseLock{
return &LeaseLock{
LeaseMeta: metav1.ObjectMeta{ LeaseMeta: metav1.ObjectMeta{
Namespace: ns, Namespace: ns,
Name: name, Name: name,
}, },
Client: coordinationClient, Client: coordinationClient,
LockConfig: rlc, LockConfig: rlc,
}
switch lockType {
case EndpointsResourceLock:
return endpointsLock, nil
case ConfigMapsResourceLock:
return configmapLock, nil
case LeasesResourceLock:
return leaseLock, nil
case EndpointsLeasesResourceLock:
return &MultiLock{
Primary: endpointsLock,
Secondary: leaseLock,
}, nil
case ConfigMapsLeasesResourceLock:
return &MultiLock{
Primary: configmapLock,
Secondary: leaseLock,
}, nil }, nil
default: default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType) return nil, fmt.Errorf("Invalid lock-type %s", lockType)

View File

@ -17,6 +17,7 @@ limitations under the License.
package resourcelock package resourcelock
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -36,13 +37,18 @@ type LeaseLock struct {
} }
// Get returns the election record from a Lease spec // Get returns the election record from a Lease spec
func (ll *LeaseLock) Get() (*LeaderElectionRecord, error) { func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) {
var err error var err error
ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{}) ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return LeaseSpecToLeaderElectionRecord(&ll.lease.Spec), nil record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
recordByte, err := json.Marshal(*record)
if err != nil {
return nil, nil, err
}
return record, recordByte, nil
} }
// Create attempts to create a Lease // Create attempts to create a Lease
@ -84,7 +90,7 @@ func (ll *LeaseLock) Describe() string {
return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name)
} }
// returns the Identity of the lock // Identity returns the Identity of the lock
func (ll *LeaseLock) Identity() string { func (ll *LeaseLock) Identity() string {
return ll.LockConfig.Identity return ll.LockConfig.Identity
} }

View File

@ -0,0 +1,103 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcelock
import (
"bytes"
"encoding/json"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)
const (
UnknownLeader = "leaderelection.k8s.io/unknown"
)
// MultiLock is used for lock's migration
type MultiLock struct {
Primary Interface
Secondary Interface
}
// Get returns the older election record of the lock
func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) {
primary, primaryRaw, err := ml.Primary.Get()
if err != nil {
return nil, nil, err
}
secondary, secondaryRaw, err := ml.Secondary.Get()
if err != nil {
// Lock is held by old client
if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() {
return primary, primaryRaw, nil
}
return nil, nil, err
}
if primary.HolderIdentity != secondary.HolderIdentity {
primary.HolderIdentity = UnknownLeader
primaryRaw, err = json.Marshal(primary)
if err != nil {
return nil, nil, err
}
}
return primary, ConcatRawRecord(primaryRaw, secondaryRaw), nil
}
// Create attempts to create both primary lock and secondary lock
func (ml *MultiLock) Create(ler LeaderElectionRecord) error {
err := ml.Primary.Create(ler)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return ml.Secondary.Create(ler)
}
// Update will update and existing annotation on both two resources.
func (ml *MultiLock) Update(ler LeaderElectionRecord) error {
err := ml.Primary.Update(ler)
if err != nil {
return err
}
_, _, err = ml.Secondary.Get()
if err != nil && apierrors.IsNotFound(err) {
return ml.Secondary.Create(ler)
}
return ml.Secondary.Update(ler)
}
// RecordEvent in leader election while adding meta-data
func (ml *MultiLock) RecordEvent(s string) {
ml.Primary.RecordEvent(s)
ml.Secondary.RecordEvent(s)
}
// Describe is used to convert details on current resource lock
// into a string
func (ml *MultiLock) Describe() string {
return ml.Primary.Describe()
}
// Identity returns the Identity of the lock
func (ml *MultiLock) Identity() string {
return ml.Primary.Identity()
}
func ConcatRawRecord(primaryRaw, secondaryRaw []byte) []byte {
return bytes.Join([][]byte{primaryRaw, secondaryRaw}, []byte(","))
}