diff --git a/tools/leaderelection/healthzadaptor_test.go b/tools/leaderelection/healthzadaptor_test.go index 8226c3cf..df209194 100644 --- a/tools/leaderelection/healthzadaptor_test.go +++ b/tools/leaderelection/healthzadaptor_test.go @@ -21,9 +21,10 @@ import ( "testing" "time" + "net/http" + "k8s.io/apimachinery/pkg/util/clock" rl "k8s.io/client-go/tools/leaderelection/resourcelock" - "net/http" ) type fakeLock struct { @@ -31,8 +32,8 @@ type fakeLock struct { } // Get is a dummy to allow us to have a fakeLock for testing. -func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, err error) { - return nil, nil +func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, rawRecord []byte, err error) { + return nil, nil, nil } // Create is a dummy to allow us to have a fakeLock for testing. diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index 4be650c0..4e6d3ecd 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -53,9 +53,9 @@ limitations under the License. package leaderelection import ( + "bytes" "context" "fmt" - "reflect" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -176,8 +176,9 @@ type LeaderCallbacks struct { type LeaderElector struct { config LeaderElectionConfig // internal bookkeeping - observedRecord rl.LeaderElectionRecord - observedTime time.Time + observedRecord rl.LeaderElectionRecord + observedRawRecord []byte + observedTime time.Time // used to implement OnNewLeader(), may lag slightly from the // value observedRecord.HolderIdentity if the transition has // not yet been reported. @@ -324,7 +325,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { } // 1. obtain or create the ElectionRecord - oldLeaderElectionRecord, err := le.config.Lock.Get() + oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get() if err != nil { if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) @@ -340,8 +341,9 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { } // 2. Record obtained, check the Identity & Time - if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { + if bytes.Compare(le.observedRawRecord, oldLeaderElectionRawRecord) != 0 { le.observedRecord = *oldLeaderElectionRecord + le.observedRawRecord = oldLeaderElectionRawRecord le.observedTime = le.clock.Now() } if len(oldLeaderElectionRecord.HolderIdentity) > 0 && @@ -365,6 +367,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { klog.Errorf("Failed to update lock: %v", err) return false } + le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index d2fe5964..af5b8c8f 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -37,7 +37,7 @@ import ( "k8s.io/client-go/tools/record" ) -func createLockObject(objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) { +func createLockObject(t *testing.T, objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) { objectMeta := metav1.ObjectMeta{ Namespace: namespace, Name: name, @@ -59,7 +59,7 @@ func createLockObject(objectType, namespace, name string, record rl.LeaderElecti spec := rl.LeaderElectionRecordToLeaseSpec(&record) obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec} default: - panic("unexpected objType:" + objectType) + t.Fatal("unexpected objType:" + objectType) } return } @@ -69,6 +69,12 @@ func TestTryAcquireOrRenewEndpoints(t *testing.T) { testTryAcquireOrRenew(t, "endpoints") } +type Reactor struct { + verb string + objectType string + reaction fakeclient.ReactionFunc +} + func testTryAcquireOrRenew(t *testing.T, objectType string) { future := time.Now().Add(1000 * time.Hour) past := time.Now().Add(-1000 * time.Hour) @@ -77,10 +83,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name string observedRecord rl.LeaderElectionRecord observedTime time.Time - reactors []struct { - verb string - reaction fakeclient.ReactionFunc - } + reactors []Reactor expectSuccess bool transitionLeader bool @@ -88,10 +91,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }{ { name: "acquire from no object", - reactors: []struct { - verb string - reaction fakeclient.ReactionFunc - }{ + reactors: []Reactor{ { verb: "get", reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { @@ -110,14 +110,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, { name: "acquire from unled object", - reactors: []struct { - verb string - reaction fakeclient.ReactionFunc - }{ + reactors: []Reactor{ { verb: "get", reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil }, }, { @@ -134,14 +131,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, { name: "acquire from led, unacked object", - reactors: []struct { - verb string - reaction fakeclient.ReactionFunc - }{ + reactors: []Reactor{ { verb: "get", reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil }, }, { @@ -160,14 +154,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, { name: "acquire from empty led, acked object", - reactors: []struct { - verb string - reaction fakeclient.ReactionFunc - }{ + reactors: []Reactor{ { verb: "get", reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil }, }, { @@ -185,14 +176,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, { name: "don't acquire from led, acked object", - reactors: []struct { - verb string - reaction fakeclient.ReactionFunc - }{ + reactors: []Reactor{ { verb: "get", reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil }, }, }, @@ -203,14 +191,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, { name: "renew already acquired object", - reactors: []struct { - verb string - reaction fakeclient.ReactionFunc - }{ + reactors: []Reactor{ { verb: "get", reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { - return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil }, }, { @@ -282,13 +267,14 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, }, } + observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord) le := &LeaderElector{ - config: lec, - observedRecord: test.observedRecord, - observedTime: test.observedTime, - clock: clock.RealClock{}, + config: lec, + observedRecord: test.observedRecord, + observedRawRecord: observedRawRecord, + observedTime: test.observedTime, + clock: clock.RealClock{}, } - if test.expectSuccess != le.tryAcquireOrRenew() { t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) } @@ -352,3 +338,560 @@ func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) { t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord)) } } + +func multiLockType(t *testing.T, objectType string) (primaryType, secondaryType string) { + switch objectType { + case rl.EndpointsLeasesResourceLock: + return rl.EndpointsResourceLock, rl.LeasesResourceLock + case rl.ConfigMapsLeasesResourceLock: + return rl.ConfigMapsResourceLock, rl.LeasesResourceLock + default: + t.Fatal("unexpected objType:" + objectType) + } + return +} + +func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) { + var err error + switch objectType { + case "endpoints", "configmaps", "leases": + ret, err = json.Marshal(ler) + if err != nil { + t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err) + } + case "endpointsleases", "configmapsleases": + recordBytes, err := json.Marshal(ler) + if err != nil { + t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err) + } + ret = rl.ConcatRawRecord(recordBytes, recordBytes) + default: + t.Fatal("unexpected objType:" + objectType) + } + return +} + +func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) { + future := time.Now().Add(1000 * time.Hour) + past := time.Now().Add(-1000 * time.Hour) + primaryType, secondaryType := multiLockType(t, objectType) + tests := []struct { + name string + observedRecord rl.LeaderElectionRecord + observedRawRecord []byte + observedTime time.Time + reactors []Reactor + + expectSuccess bool + transitionLeader bool + outHolder string + }{ + { + name: "acquire from no object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil + }, + }, + { + verb: "create", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil + }, + }, + }, + expectSuccess: true, + outHolder: "baz", + }, + { + name: "acquire from unled old object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "update", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "create", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil + }, + }, + }, + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "acquire from unled transition object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil + }, + }, + { + verb: "update", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil + }, + }, + { + verb: "update", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + }, + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "acquire from led, unack old object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + { + verb: "update", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "create", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}), + observedTime: past, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "acquire from led, unack transition object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "update", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "update", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}), + observedTime: past, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "acquire from conflict led, ack transition object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}), + observedTime: future, + + expectSuccess: false, + outHolder: rl.UnknownLeader, + }, + { + name: "acquire from led, unack unknown object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil + }, + }, + { + verb: "update", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil + }, + }, + { + verb: "update", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}, + observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), + observedTime: past, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "don't acquire from led, ack old object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}), + observedTime: future, + + expectSuccess: false, + outHolder: "bing", + }, + { + name: "don't acquire from led, acked new object, observe new record", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedRawRecord: GetRawRecordOrDie(t, secondaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}), + observedTime: future, + + expectSuccess: false, + outHolder: rl.UnknownLeader, + }, + { + name: "don't acquire from led, acked new object, observe transition record", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}), + observedTime: future, + + expectSuccess: false, + outHolder: "bing", + }, + { + name: "renew already required object", + reactors: []Reactor{ + { + verb: "get", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + { + verb: "update", + objectType: primaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + { + verb: "get", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + { + verb: "update", + objectType: secondaryType, + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.UpdateAction).GetObject(), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"}, + observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "baz"}), + observedTime: future, + + expectSuccess: true, + outHolder: "baz", + }, + } + + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface + + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, reactor.objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + + switch objectType { + case rl.EndpointsLeasesResourceLock: + lock = &rl.MultiLock{ + Primary: &rl.EndpointsLock{ + EndpointsMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoreV1(), + }, + Secondary: &rl.LeaseLock{ + LeaseMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoordinationV1(), + }, + } + case rl.ConfigMapsLeasesResourceLock: + lock = &rl.MultiLock{ + Primary: &rl.ConfigMapLock{ + ConfigMapMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoreV1(), + }, + Secondary: &rl.LeaseLock{ + LeaseMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoordinationV1(), + }, + } + } + + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, + }, + } + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedRawRecord: test.observedRawRecord, + observedTime: test.observedTime, + clock: clock.RealClock{}, + } + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) + } + + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } + + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + }) + } +} + +// Will test leader election using endpointsleases as the resource +func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) { + testTryAcquireOrRenewMultiLock(t, "endpointsleases") +} + +// Will test leader election using configmapsleases as the resource +func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) { + testTryAcquireOrRenewMultiLock(t, "configmapsleases") +} diff --git a/tools/leaderelection/resourcelock/configmaplock.go b/tools/leaderelection/resourcelock/configmaplock.go index 78535689..fd152b07 100644 --- a/tools/leaderelection/resourcelock/configmaplock.go +++ b/tools/leaderelection/resourcelock/configmaplock.go @@ -41,22 +41,23 @@ type ConfigMapLock struct { } // Get returns the election record from a ConfigMap Annotation -func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, error) { +func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, []byte, error) { var record LeaderElectionRecord var err error cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{}) if err != nil { - return nil, err + return nil, nil, err } if cml.cm.Annotations == nil { cml.cm.Annotations = make(map[string]string) } - if recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]; found { + recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey] + if found { if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { - return nil, err + return nil, nil, err } } - return &record, nil + return &record, []byte(recordBytes), nil } // Create attempts to create a LeaderElectionRecord annotation @@ -106,7 +107,7 @@ func (cml *ConfigMapLock) Describe() string { return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) } -// returns the Identity of the lock +// Identity returns the Identity of the lock func (cml *ConfigMapLock) Identity() string { return cml.LockConfig.Identity } diff --git a/tools/leaderelection/resourcelock/endpointslock.go b/tools/leaderelection/resourcelock/endpointslock.go index bfe5e8b1..e01b0d49 100644 --- a/tools/leaderelection/resourcelock/endpointslock.go +++ b/tools/leaderelection/resourcelock/endpointslock.go @@ -36,22 +36,23 @@ type EndpointsLock struct { } // Get returns the election record from a Endpoints Annotation -func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) { +func (el *EndpointsLock) Get() (*LeaderElectionRecord, []byte, error) { var record LeaderElectionRecord var err error el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{}) if err != nil { - return nil, err + return nil, nil, err } if el.e.Annotations == nil { el.e.Annotations = make(map[string]string) } - if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found { + recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey] + if found { if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { - return nil, err + return nil, nil, err } } - return &record, nil + return &record, []byte(recordBytes), nil } // Create attempts to create a LeaderElectionRecord annotation @@ -101,7 +102,7 @@ func (el *EndpointsLock) Describe() string { return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) } -// returns the Identity of the lock +// Identity returns the Identity of the lock func (el *EndpointsLock) Identity() string { return el.LockConfig.Identity } diff --git a/tools/leaderelection/resourcelock/interface.go b/tools/leaderelection/resourcelock/interface.go index 050d41a2..c9f17591 100644 --- a/tools/leaderelection/resourcelock/interface.go +++ b/tools/leaderelection/resourcelock/interface.go @@ -30,6 +30,8 @@ const ( EndpointsResourceLock = "endpoints" ConfigMapsResourceLock = "configmaps" LeasesResourceLock = "leases" + EndpointsLeasesResourceLock = "endpointsleases" + ConfigMapsLeasesResourceLock = "configmapsleases" ) // LeaderElectionRecord is the record that is stored in the leader election annotation. @@ -71,7 +73,7 @@ type ResourceLockConfig struct { // by the leaderelection code. type Interface interface { // Get returns the LeaderElectionRecord - Get() (*LeaderElectionRecord, error) + Get() (*LeaderElectionRecord, []byte, error) // Create attempts to create a LeaderElectionRecord Create(ler LeaderElectionRecord) error @@ -92,33 +94,46 @@ type Interface interface { // Manufacture will create a lock of a given type according to the input parameters func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { + endpointsLock := &EndpointsLock{ + EndpointsMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coreClient, + LockConfig: rlc, + } + configmapLock := &ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coreClient, + LockConfig: rlc, + } + leaseLock := &LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coordinationClient, + LockConfig: rlc, + } switch lockType { case EndpointsResourceLock: - return &EndpointsLock{ - EndpointsMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Client: coreClient, - LockConfig: rlc, - }, nil + return endpointsLock, nil case ConfigMapsResourceLock: - return &ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Client: coreClient, - LockConfig: rlc, - }, nil + return configmapLock, nil case LeasesResourceLock: - return &LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Client: coordinationClient, - LockConfig: rlc, + return leaseLock, nil + case EndpointsLeasesResourceLock: + return &MultiLock{ + Primary: endpointsLock, + Secondary: leaseLock, + }, nil + case ConfigMapsLeasesResourceLock: + return &MultiLock{ + Primary: configmapLock, + Secondary: leaseLock, }, nil default: return nil, fmt.Errorf("Invalid lock-type %s", lockType) diff --git a/tools/leaderelection/resourcelock/leaselock.go b/tools/leaderelection/resourcelock/leaselock.go index 285f9440..1f8c875d 100644 --- a/tools/leaderelection/resourcelock/leaselock.go +++ b/tools/leaderelection/resourcelock/leaselock.go @@ -17,6 +17,7 @@ limitations under the License. package resourcelock import ( + "encoding/json" "errors" "fmt" @@ -36,13 +37,18 @@ type LeaseLock struct { } // Get returns the election record from a Lease spec -func (ll *LeaseLock) Get() (*LeaderElectionRecord, error) { +func (ll *LeaseLock) Get() (*LeaderElectionRecord, []byte, error) { var err error ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{}) if err != nil { - return nil, err + return nil, nil, err } - return LeaseSpecToLeaderElectionRecord(&ll.lease.Spec), nil + record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) + recordByte, err := json.Marshal(*record) + if err != nil { + return nil, nil, err + } + return record, recordByte, nil } // Create attempts to create a Lease @@ -84,7 +90,7 @@ func (ll *LeaseLock) Describe() string { return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) } -// returns the Identity of the lock +// Identity returns the Identity of the lock func (ll *LeaseLock) Identity() string { return ll.LockConfig.Identity } diff --git a/tools/leaderelection/resourcelock/multilock.go b/tools/leaderelection/resourcelock/multilock.go new file mode 100644 index 00000000..8cb89dc4 --- /dev/null +++ b/tools/leaderelection/resourcelock/multilock.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "bytes" + "encoding/json" + + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +const ( + UnknownLeader = "leaderelection.k8s.io/unknown" +) + +// MultiLock is used for lock's migration +type MultiLock struct { + Primary Interface + Secondary Interface +} + +// Get returns the older election record of the lock +func (ml *MultiLock) Get() (*LeaderElectionRecord, []byte, error) { + primary, primaryRaw, err := ml.Primary.Get() + if err != nil { + return nil, nil, err + } + + secondary, secondaryRaw, err := ml.Secondary.Get() + if err != nil { + // Lock is held by old client + if apierrors.IsNotFound(err) && primary.HolderIdentity != ml.Identity() { + return primary, primaryRaw, nil + } + return nil, nil, err + } + + if primary.HolderIdentity != secondary.HolderIdentity { + primary.HolderIdentity = UnknownLeader + primaryRaw, err = json.Marshal(primary) + if err != nil { + return nil, nil, err + } + } + return primary, ConcatRawRecord(primaryRaw, secondaryRaw), nil +} + +// Create attempts to create both primary lock and secondary lock +func (ml *MultiLock) Create(ler LeaderElectionRecord) error { + err := ml.Primary.Create(ler) + if err != nil && !apierrors.IsAlreadyExists(err) { + return err + } + return ml.Secondary.Create(ler) +} + +// Update will update and existing annotation on both two resources. +func (ml *MultiLock) Update(ler LeaderElectionRecord) error { + err := ml.Primary.Update(ler) + if err != nil { + return err + } + _, _, err = ml.Secondary.Get() + if err != nil && apierrors.IsNotFound(err) { + return ml.Secondary.Create(ler) + } + return ml.Secondary.Update(ler) +} + +// RecordEvent in leader election while adding meta-data +func (ml *MultiLock) RecordEvent(s string) { + ml.Primary.RecordEvent(s) + ml.Secondary.RecordEvent(s) +} + +// Describe is used to convert details on current resource lock +// into a string +func (ml *MultiLock) Describe() string { + return ml.Primary.Describe() +} + +// Identity returns the Identity of the lock +func (ml *MultiLock) Identity() string { + return ml.Primary.Identity() +} + +func ConcatRawRecord(primaryRaw, secondaryRaw []byte) []byte { + return bytes.Join([][]byte{primaryRaw, secondaryRaw}, []byte(",")) +}