diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index c286b257..d2fe5964 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -17,28 +17,47 @@ limitations under the License. package leaderelection import ( + "encoding/json" "fmt" "sync" "testing" "time" - "k8s.io/api/core/v1" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" - fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" - core "k8s.io/client-go/testing" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/client-go/kubernetes/fake" + fakeclient "k8s.io/client-go/testing" rl "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" ) -func createLockObject(objectType string, objectMeta metav1.ObjectMeta) (obj runtime.Object) { +func createLockObject(objectType, namespace, name string, record rl.LeaderElectionRecord) (obj runtime.Object) { + objectMeta := metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + } switch objectType { case "endpoints": - obj = &v1.Endpoints{ObjectMeta: objectMeta} + recordBytes, _ := json.Marshal(record) + objectMeta.Annotations = map[string]string{ + rl.LeaderElectionRecordAnnotationKey: string(recordBytes), + } + obj = &corev1.Endpoints{ObjectMeta: objectMeta} case "configmaps": - obj = &v1.ConfigMap{ObjectMeta: objectMeta} + recordBytes, _ := json.Marshal(record) + objectMeta.Annotations = map[string]string{ + rl.LeaderElectionRecordAnnotationKey: string(recordBytes), + } + obj = &corev1.ConfigMap{ObjectMeta: objectMeta} + case "leases": + spec := rl.LeaderElectionRecordToLeaseSpec(&record) + obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec} default: panic("unexpected objType:" + objectType) } @@ -60,7 +79,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { observedTime time.Time reactors []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc } expectSuccess bool @@ -71,18 +90,18 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name: "acquire from no object", reactors: []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc }{ { verb: "get", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, errors.NewNotFound(action.(core.GetAction).GetResource().GroupResource(), action.(core.GetAction).GetName()) + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName()) }, }, { verb: "create", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(core.CreateAction).GetObject(), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil }, }, }, @@ -93,22 +112,18 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name: "acquire from unled object", reactors: []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc }{ { verb: "get", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - objectMeta := metav1.ObjectMeta{ - Namespace: action.GetNamespace(), - Name: action.(core.GetAction).GetName(), - } - return true, createLockObject(objectType, objectMeta), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{}), nil }, }, { verb: "update", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(core.CreateAction).GetObject(), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil }, }, }, @@ -121,25 +136,18 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name: "acquire from led, unacked object", reactors: []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc }{ { verb: "get", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - objectMeta := metav1.ObjectMeta{ - Namespace: action.GetNamespace(), - Name: action.(core.GetAction).GetName(), - Annotations: map[string]string{ - rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, - }, - } - return true, createLockObject(objectType, objectMeta), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil }, }, { verb: "update", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(core.CreateAction).GetObject(), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil }, }, }, @@ -154,25 +162,18 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name: "acquire from empty led, acked object", reactors: []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc }{ { verb: "get", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - objectMeta := metav1.ObjectMeta{ - Namespace: action.GetNamespace(), - Name: action.(core.GetAction).GetName(), - Annotations: map[string]string{ - rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":""}`, - }, - } - return true, createLockObject(objectType, objectMeta), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: ""}), nil }, }, { verb: "update", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(core.CreateAction).GetObject(), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil }, }, }, @@ -186,19 +187,12 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name: "don't acquire from led, acked object", reactors: []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc }{ { verb: "get", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - objectMeta := metav1.ObjectMeta{ - Namespace: action.GetNamespace(), - Name: action.(core.GetAction).GetName(), - Annotations: map[string]string{ - rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, - }, - } - return true, createLockObject(objectType, objectMeta), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil }, }, }, @@ -211,25 +205,18 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { name: "renew already acquired object", reactors: []struct { verb string - reaction core.ReactionFunc + reaction fakeclient.ReactionFunc }{ { verb: "get", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - objectMeta := metav1.ObjectMeta{ - Namespace: action.GetNamespace(), - Name: action.(core.GetAction).GetName(), - Annotations: map[string]string{ - rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`, - }, - } - return true, createLockObject(objectType, objectMeta), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil }, }, { verb: "update", - reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(core.CreateAction).GetObject(), nil + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil }, }, }, @@ -255,11 +242,11 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { Identity: "baz", EventRecorder: &record.FakeRecorder{}, } - c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} + c := &fake.Clientset{} for _, reactor := range test.reactors { c.AddReactor(reactor.verb, objectType, reactor.reaction) } - c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { t.Errorf("unreachable action. testclient called too many times: %+v", action) return true, nil, fmt.Errorf("unreachable action") }) @@ -269,13 +256,19 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { lock = &rl.EndpointsLock{ EndpointsMeta: objectMeta, LockConfig: resourceLockConfig, - Client: c, + Client: c.CoreV1(), } case "configmaps": lock = &rl.ConfigMapLock{ ConfigMapMeta: objectMeta, LockConfig: resourceLockConfig, - Client: c, + Client: c.CoreV1(), + } + case "leases": + lock = &rl.LeaseLock{ + LeaseMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoordinationV1(), } } @@ -328,3 +321,34 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { func TestTryAcquireOrRenewConfigMaps(t *testing.T) { testTryAcquireOrRenew(t, "configmaps") } + +// Will test leader election using lease as the resource +func TestTryAcquireOrRenewLeases(t *testing.T) { + testTryAcquireOrRenew(t, "leases") +} + +func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) { + holderIdentity := "foo" + leaseDurationSeconds := int32(10) + leaseTransitions := int32(1) + oldSpec := coordinationv1.LeaseSpec{ + HolderIdentity: &holderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &metav1.MicroTime{time.Now()}, + RenewTime: &metav1.MicroTime{time.Now()}, + LeaseTransitions: &leaseTransitions, + } + + oldRecord := rl.LeaseSpecToLeaderElectionRecord(&oldSpec) + newSpec := rl.LeaderElectionRecordToLeaseSpec(oldRecord) + + if !equality.Semantic.DeepEqual(oldSpec, newSpec) { + t.Errorf("diff: %v", diff.ObjectReflectDiff(oldSpec, newSpec)) + } + + newRecord := rl.LeaseSpecToLeaderElectionRecord(&newSpec) + + if !equality.Semantic.DeepEqual(oldRecord, newRecord) { + t.Errorf("diff: %v", diff.ObjectReflectDiff(oldRecord, newRecord)) + } +} diff --git a/tools/leaderelection/resourcelock/interface.go b/tools/leaderelection/resourcelock/interface.go index 0bf8e9cd..050d41a2 100644 --- a/tools/leaderelection/resourcelock/interface.go +++ b/tools/leaderelection/resourcelock/interface.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -28,6 +29,7 @@ const ( LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader" EndpointsResourceLock = "endpoints" ConfigMapsResourceLock = "configmaps" + LeasesResourceLock = "leases" ) // LeaderElectionRecord is the record that is stored in the leader election annotation. @@ -89,7 +91,7 @@ type Interface interface { } // Manufacture will create a lock of a given type according to the input parameters -func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) { +func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { switch lockType { case EndpointsResourceLock: return &EndpointsLock{ @@ -97,7 +99,7 @@ func New(lockType string, ns string, name string, client corev1.CoreV1Interface, Namespace: ns, Name: name, }, - Client: client, + Client: coreClient, LockConfig: rlc, }, nil case ConfigMapsResourceLock: @@ -106,7 +108,16 @@ func New(lockType string, ns string, name string, client corev1.CoreV1Interface, Namespace: ns, Name: name, }, - Client: client, + Client: coreClient, + LockConfig: rlc, + }, nil + case LeasesResourceLock: + return &LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coordinationClient, LockConfig: rlc, }, nil default: diff --git a/tools/leaderelection/resourcelock/leaselock.go b/tools/leaderelection/resourcelock/leaselock.go new file mode 100644 index 00000000..285f9440 --- /dev/null +++ b/tools/leaderelection/resourcelock/leaselock.go @@ -0,0 +1,124 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "errors" + "fmt" + + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" +) + +type LeaseLock struct { + // LeaseMeta should contain a Name and a Namespace of a + // LeaseMeta object that the LeaderElector will attempt to lead. + LeaseMeta metav1.ObjectMeta + Client coordinationv1client.LeasesGetter + LockConfig ResourceLockConfig + lease *coordinationv1.Lease +} + +// Get returns the election record from a Lease spec +func (ll *LeaseLock) Get() (*LeaderElectionRecord, error) { + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return LeaseSpecToLeaderElectionRecord(&ll.lease.Spec), nil +} + +// Create attempts to create a Lease +func (ll *LeaseLock) Create(ler LeaderElectionRecord) error { + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(&coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: ll.LeaseMeta.Name, + Namespace: ll.LeaseMeta.Namespace, + }, + Spec: LeaderElectionRecordToLeaseSpec(&ler), + }) + return err +} + +// Update will update an existing Lease spec. +func (ll *LeaseLock) Update(ler LeaderElectionRecord) error { + if ll.lease == nil { + return errors.New("lease not initialized, call get or create first") + } + ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler) + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ll.lease) + return err +} + +// RecordEvent in leader election while adding meta-data +func (ll *LeaseLock) RecordEvent(s string) { + if ll.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s) + ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (ll *LeaseLock) Describe() string { + return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) +} + +// returns the Identity of the lock +func (ll *LeaseLock) Identity() string { + return ll.LockConfig.Identity +} + +func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord { + holderIdentity := "" + if spec.HolderIdentity != nil { + holderIdentity = *spec.HolderIdentity + } + leaseDurationSeconds := 0 + if spec.LeaseDurationSeconds != nil { + leaseDurationSeconds = int(*spec.LeaseDurationSeconds) + } + leaseTransitions := 0 + if spec.LeaseTransitions != nil { + leaseTransitions = int(*spec.LeaseTransitions) + } + return &LeaderElectionRecord{ + HolderIdentity: holderIdentity, + LeaseDurationSeconds: leaseDurationSeconds, + AcquireTime: metav1.Time{spec.AcquireTime.Time}, + RenewTime: metav1.Time{spec.RenewTime.Time}, + LeaderTransitions: leaseTransitions, + } +} + +func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { + leaseDurationSeconds := int32(ler.LeaseDurationSeconds) + leaseTransitions := int32(ler.LeaderTransitions) + return coordinationv1.LeaseSpec{ + HolderIdentity: &ler.HolderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &metav1.MicroTime{ler.AcquireTime.Time}, + RenewTime: &metav1.MicroTime{ler.RenewTime.Time}, + LeaseTransitions: &leaseTransitions, + } +}