Remove support for EndpointsLeases and ConfigMapsLeases lock from leader election

Kubernetes-commit: 2bd42061b6e8035c3def54545ccc4baf95bd6586
This commit is contained in:
Wojciech Tyczyński 2023-04-24 15:08:10 +02:00 committed by Kubernetes Publisher
parent b26f379fe1
commit 27bbe76535
4 changed files with 11 additions and 801 deletions

View File

@ -381,541 +381,20 @@ func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
}
}
func multiLockType(t *testing.T, objectType string) (primaryType, secondaryType string) {
switch objectType {
case rl.EndpointsLeasesResourceLock:
return "endpoints", rl.LeasesResourceLock
case rl.ConfigMapsLeasesResourceLock:
return "configmaps", rl.LeasesResourceLock
default:
t.Fatal("unexpected objType:" + objectType)
}
return
}
func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
var err error
switch objectType {
case "endpoints", "configmaps", "leases":
case "leases":
ret, err = json.Marshal(ler)
if err != nil {
t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
}
case "endpointsleases", "configmapsleases":
recordBytes, err := json.Marshal(ler)
if err != nil {
t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
}
ret = rl.ConcatRawRecord(recordBytes, recordBytes)
default:
t.Fatal("unexpected objType:" + objectType)
}
return
}
func testTryAcquireOrRenewMultiLock(t *testing.T, objectType string) {
clock := clock.RealClock{}
future := clock.Now().Add(1000 * time.Hour)
past := clock.Now().Add(-1000 * time.Hour)
primaryType, secondaryType := multiLockType(t, objectType)
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedRawRecord []byte
observedTime time.Time
reactors []Reactor
expectedEvents []string
expectSuccess bool
transitionLeader bool
outHolder string
}{
{
name: "acquire from no object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
expectSuccess: true,
outHolder: "baz",
},
{
name: "acquire from unled old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from unled transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led, unack old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "create",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.CreateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from led, unack transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "acquire from conflict led, ack transition object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: rl.UnknownLeader,
},
{
name: "acquire from led, unack unknown object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: rl.UnknownLeader}),
observedTime: past,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "don't acquire from led, ack old object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, primaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "don't acquire from led, acked new object, observe new record",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, secondaryType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: rl.UnknownLeader,
},
{
name: "don't acquire from led, acked new object, observe transition record",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "bing"}),
observedTime: future,
expectSuccess: false,
outHolder: "bing",
},
{
name: "renew already required object",
reactors: []Reactor{
{
verb: "get",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, primaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
objectType: primaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
{
verb: "get",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, createLockObject(t, secondaryType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
},
},
{
verb: "update",
objectType: secondaryType,
reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(fakeclient.UpdateAction).GetObject(), nil
},
},
},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
observedRawRecord: GetRawRecordOrDie(t, objectType, rl.LeaderElectionRecord{HolderIdentity: "baz"}),
observedTime: future,
expectSuccess: true,
outHolder: "baz",
},
}
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
recorder := record.NewFakeRecorder(100)
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: recorder,
}
c := &fake.Clientset{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, reactor.objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
if err != nil {
t.Fatalf("Couldn't create lock: %v", err)
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedRawRecord: test.observedRawRecord,
observedTime: test.observedTime,
clock: clock,
}
if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
assertEqualEvents(t, test.expectedEvents, recorder.Events)
})
}
}
// Will test leader election using endpointsleases as the resource
func TestTryAcquireOrRenewEndpointsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "endpointsleases")
}
// Will test leader election using configmapsleases as the resource
func TestTryAcquireOrRenewConfigMapsLeases(t *testing.T) {
testTryAcquireOrRenewMultiLock(t, "configmapsleases")
}
func testReleaseLease(t *testing.T, objectType string) {
tests := []struct {
name string

View File

@ -1,126 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcelock
import (
"context"
"encoding/json"
"errors"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
// TODO: This is almost a exact replica of Endpoints lock.
// going forwards as we self host more and more components
// and use ConfigMaps as the means to pass that configuration
// data we will likely move to deprecate the Endpoints lock.
type configMapLock struct {
// ConfigMapMeta should contain a Name and a Namespace of a
// ConfigMapMeta object that the LeaderElector will attempt to lead.
ConfigMapMeta metav1.ObjectMeta
Client corev1client.ConfigMapsGetter
LockConfig ResourceLockConfig
cm *v1.ConfigMap
}
// Get returns the election record from a ConfigMap Annotation
func (cml *configMapLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(ctx, cml.ConfigMapMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
cml.cm = cm
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
recordStr, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]
recordBytes := []byte(recordStr)
if found {
if err := json.Unmarshal(recordBytes, &record); err != nil {
return nil, nil, err
}
}
return &record, recordBytes, nil
}
// Create attempts to create a LeaderElectionRecord annotation
func (cml *configMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cml.ConfigMapMeta.Name,
Namespace: cml.ConfigMapMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
}, metav1.CreateOptions{})
return err
}
// Update will update an existing annotation on a given resource.
func (cml *configMapLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if cml.cm == nil {
return errors.New("configmap not initialized, call get or create first")
}
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
if cml.cm.Annotations == nil {
cml.cm.Annotations = make(map[string]string)
}
cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
cm, err := cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(ctx, cml.cm, metav1.UpdateOptions{})
if err != nil {
return err
}
cml.cm = cm
return nil
}
// RecordEvent in leader election while adding meta-data
func (cml *configMapLock) RecordEvent(s string) {
if cml.LockConfig.EventRecorder == nil {
return
}
events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s)
subject := &v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}
// Populate the type meta, so we don't have to get it from the schema
subject.Kind = "ConfigMap"
subject.APIVersion = v1.SchemeGroupVersion.String()
cml.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events)
}
// Describe is used to convert details on current resource lock
// into a string
func (cml *configMapLock) Describe() string {
return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name)
}
// Identity returns the Identity of the lock
func (cml *configMapLock) Identity() string {
return cml.LockConfig.Identity
}

View File

@ -1,121 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcelock
import (
"context"
"encoding/json"
"errors"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
type endpointsLock struct {
// EndpointsMeta should contain a Name and a Namespace of an
// Endpoints object that the LeaderElector will attempt to lead.
EndpointsMeta metav1.ObjectMeta
Client corev1client.EndpointsGetter
LockConfig ResourceLockConfig
e *v1.Endpoints
}
// Get returns the election record from a Endpoints Annotation
func (el *endpointsLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
var record LeaderElectionRecord
ep, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(ctx, el.EndpointsMeta.Name, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
el.e = ep
if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string)
}
recordStr, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]
recordBytes := []byte(recordStr)
if found {
if err := json.Unmarshal(recordBytes, &record); err != nil {
return nil, nil, err
}
}
return &record, recordBytes, nil
}
// Create attempts to create a LeaderElectionRecord annotation
func (el *endpointsLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(ctx, &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
}, metav1.CreateOptions{})
return err
}
// Update will update and existing annotation on a given resource.
func (el *endpointsLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
if el.e == nil {
return errors.New("endpoint not initialized, call get or create first")
}
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string)
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
e, err := el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(ctx, el.e, metav1.UpdateOptions{})
if err != nil {
return err
}
el.e = e
return nil
}
// RecordEvent in leader election while adding meta-data
func (el *endpointsLock) RecordEvent(s string) {
if el.LockConfig.EventRecorder == nil {
return
}
events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s)
subject := &v1.Endpoints{ObjectMeta: el.e.ObjectMeta}
// Populate the type meta, so we don't have to get it from the schema
subject.Kind = "Endpoints"
subject.APIVersion = v1.SchemeGroupVersion.String()
el.LockConfig.EventRecorder.Eventf(subject, v1.EventTypeNormal, "LeaderElection", events)
}
// Describe is used to convert details on current resource lock
// into a string
func (el *endpointsLock) Describe() string {
return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name)
}
// Identity returns the Identity of the lock
func (el *endpointsLock) Identity() string {
return el.LockConfig.Identity
}

View File

@ -34,7 +34,7 @@ const (
endpointsResourceLock = "endpoints"
configMapsResourceLock = "configmaps"
LeasesResourceLock = "leases"
// When using EndpointsLeasesResourceLock, you need to ensure that
// When using endpointsLeasesResourceLock, you need to ensure that
// API Priority & Fairness is configured with non-default flow-schema
// that will catch the necessary operations on leader-election related
// endpoint objects.
@ -67,8 +67,8 @@ const (
// serviceAccount:
// name: '*'
// namespace: kube-system
EndpointsLeasesResourceLock = "endpointsleases"
// When using ConfigMapsLeasesResourceLock, you need to ensure that
endpointsLeasesResourceLock = "endpointsleases"
// When using configMapsLeasesResourceLock, you need to ensure that
// API Priority & Fairness is configured with non-default flow-schema
// that will catch the necessary operations on leader-election related
// configmap objects.
@ -101,7 +101,7 @@ const (
// serviceAccount:
// name: '*'
// namespace: kube-system
ConfigMapsLeasesResourceLock = "configmapsleases"
configMapsLeasesResourceLock = "configmapsleases"
)
// LeaderElectionRecord is the record that is stored in the leader election annotation.
@ -164,22 +164,6 @@ type Interface interface {
// Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
endpointsLock := &endpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}
configmapLock := &configMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}
leaseLock := &LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: ns,
@ -190,21 +174,15 @@ func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interf
}
switch lockType {
case endpointsResourceLock:
return nil, fmt.Errorf("endpoints lock is removed, migrate to %s", EndpointsLeasesResourceLock)
return nil, fmt.Errorf("endpoints lock is removed, migrate to %s (using version v0.27.x)", endpointsLeasesResourceLock)
case configMapsResourceLock:
return nil, fmt.Errorf("configmaps lock is removed, migrate to %s", ConfigMapsLeasesResourceLock)
return nil, fmt.Errorf("configmaps lock is removed, migrate to %s (using version v0.27.x)", configMapsLeasesResourceLock)
case LeasesResourceLock:
return leaseLock, nil
case EndpointsLeasesResourceLock:
return &MultiLock{
Primary: endpointsLock,
Secondary: leaseLock,
}, nil
case ConfigMapsLeasesResourceLock:
return &MultiLock{
Primary: configmapLock,
Secondary: leaseLock,
}, nil
case endpointsLeasesResourceLock:
return nil, fmt.Errorf("endpointsleases lock is removed, migrate to %s", LeasesResourceLock)
case configMapsLeasesResourceLock:
return nil, fmt.Errorf("configmapsleases lock is removed, migrated to %s", LeasesResourceLock)
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}