mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #33387 from timothysc/resource_lock_abstraction
Automatic merge from submit-queue Abstraction of endpoints in leaderelection code **Problem Statement**: Currently the Leader Election code is hard coded against the endpoints api. This causes performance issues on large scale clusters due to incessant iptables refreshes, see: https://github.com/kubernetes/kubernetes/issues/26637 The goal of this PR is to: - Abstract Endpoints out of the leader election code - Fix a known bug in the event recording fixes #18386 **Special notes for your reviewer**: This is a 1st pass at abstracting the details of endpoints out into an interface. Any suggestions around how we we want to refactor this interface is welcome and could be addressed in either this PR or follow on PR. /cc @ncdc @wojtek-t @rrati
This commit is contained in:
commit
901e3e30b6
@ -41,6 +41,7 @@ import (
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
@ -179,14 +180,21 @@ func Run(s *options.CMServer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
||||
// TODO: enable other lock types
|
||||
rl := resourcelock.EndpointsLock{
|
||||
EndpointsMeta: api.ObjectMeta{
|
||||
Namespace: "kube-system",
|
||||
Name: "kube-controller-manager",
|
||||
},
|
||||
Client: leaderElectionClient,
|
||||
Identity: id,
|
||||
EventRecorder: recorder,
|
||||
Client: leaderElectionClient,
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: id,
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
||||
Lock: &rl,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
|
||||
|
@ -49,17 +49,14 @@ limitations under the License.
|
||||
package leaderelection
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
rl "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
@ -68,10 +65,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
JitterFactor = 1.2
|
||||
|
||||
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
|
||||
|
||||
JitterFactor = 1.2
|
||||
DefaultLeaseDuration = 15 * time.Second
|
||||
DefaultRenewDeadline = 10 * time.Second
|
||||
DefaultRetryPeriod = 2 * time.Second
|
||||
@ -85,11 +79,8 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
|
||||
if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
|
||||
return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
|
||||
}
|
||||
if lec.Client == nil {
|
||||
return nil, fmt.Errorf("EndpointsClient must not be nil.")
|
||||
}
|
||||
if lec.EventRecorder == nil {
|
||||
return nil, fmt.Errorf("EventRecorder must not be nil.")
|
||||
if lec.Lock == nil {
|
||||
return nil, fmt.Errorf("Lock must not be nil.")
|
||||
}
|
||||
return &LeaderElector{
|
||||
config: lec,
|
||||
@ -97,14 +88,8 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
|
||||
}
|
||||
|
||||
type LeaderElectionConfig struct {
|
||||
// EndpointsMeta should contain a Name and a Namespace of an
|
||||
// Endpoints object that the LeaderElector will attempt to lead.
|
||||
EndpointsMeta api.ObjectMeta
|
||||
// Identity is a unique identifier of the leader elector.
|
||||
Identity string
|
||||
|
||||
Client clientset.Interface
|
||||
EventRecorder record.EventRecorder
|
||||
// Lock is the resource that will be used for locking
|
||||
Lock rl.Interface
|
||||
|
||||
// LeaseDuration is the duration that non-leader candidates will
|
||||
// wait to force acquire leadership. This is measured against time of
|
||||
@ -146,7 +131,7 @@ type LeaderCallbacks struct {
|
||||
type LeaderElector struct {
|
||||
config LeaderElectionConfig
|
||||
// internal bookkeeping
|
||||
observedRecord LeaderElectionRecord
|
||||
observedRecord rl.LeaderElectionRecord
|
||||
observedTime time.Time
|
||||
// used to implement OnNewLeader(), may lag slightly from the
|
||||
// value observedRecord.HolderIdentity if the transition has
|
||||
@ -154,18 +139,6 @@ type LeaderElector struct {
|
||||
reportedLeader string
|
||||
}
|
||||
|
||||
// LeaderElectionRecord is the record that is stored in the leader election annotation.
|
||||
// This information should be used for observational purposes only and could be replaced
|
||||
// with a random string (e.g. UUID) with only slight modification of this code.
|
||||
// TODO(mikedanese): this should potentially be versioned
|
||||
type LeaderElectionRecord struct {
|
||||
HolderIdentity string `json:"holderIdentity"`
|
||||
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
|
||||
AcquireTime unversioned.Time `json:"acquireTime"`
|
||||
RenewTime unversioned.Time `json:"renewTime"`
|
||||
LeaderTransitions int `json:"leaderTransitions"`
|
||||
}
|
||||
|
||||
// Run starts the leader election loop
|
||||
func (le *LeaderElector) Run() {
|
||||
defer func() {
|
||||
@ -197,7 +170,7 @@ func (le *LeaderElector) GetLeader() string {
|
||||
|
||||
// IsLeader returns true if the last observed leader was this client else returns false.
|
||||
func (le *LeaderElector) IsLeader() bool {
|
||||
return le.observedRecord.HolderIdentity == le.config.Identity
|
||||
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
|
||||
}
|
||||
|
||||
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds.
|
||||
@ -206,12 +179,13 @@ func (le *LeaderElector) acquire() {
|
||||
wait.JitterUntil(func() {
|
||||
succeeded := le.tryAcquireOrRenew()
|
||||
le.maybeReportTransition()
|
||||
desc := le.config.Lock.Describe()
|
||||
if !succeeded {
|
||||
glog.V(4).Infof("failed to renew lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
|
||||
glog.V(4).Infof("failed to renew lease %v", desc)
|
||||
return
|
||||
}
|
||||
le.config.EventRecorder.Eventf(&api.Endpoints{ObjectMeta: le.config.EndpointsMeta}, api.EventTypeNormal, "%v became leader", le.config.Identity)
|
||||
glog.Infof("sucessfully acquired lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
|
||||
le.config.Lock.RecordEvent("became leader")
|
||||
glog.Infof("sucessfully acquired lease %v", desc)
|
||||
close(stop)
|
||||
}, le.config.RetryPeriod, JitterFactor, true, stop)
|
||||
}
|
||||
@ -224,12 +198,13 @@ func (le *LeaderElector) renew() {
|
||||
return le.tryAcquireOrRenew(), nil
|
||||
})
|
||||
le.maybeReportTransition()
|
||||
desc := le.config.Lock.Describe()
|
||||
if err == nil {
|
||||
glog.V(4).Infof("succesfully renewed lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
|
||||
glog.V(4).Infof("succesfully renewed lease %v", desc)
|
||||
return
|
||||
}
|
||||
le.config.EventRecorder.Eventf(&api.Endpoints{ObjectMeta: le.config.EndpointsMeta}, api.EventTypeNormal, "%v stopped leading", le.config.Identity)
|
||||
glog.Infof("failed to renew lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
|
||||
le.config.Lock.RecordEvent("stopped leading")
|
||||
glog.Infof("failed to renew lease %v", desc)
|
||||
close(stop)
|
||||
}, 0, stop)
|
||||
}
|
||||
@ -239,35 +214,22 @@ func (le *LeaderElector) renew() {
|
||||
// on success else returns false.
|
||||
func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
now := unversioned.Now()
|
||||
leaderElectionRecord := LeaderElectionRecord{
|
||||
HolderIdentity: le.config.Identity,
|
||||
leaderElectionRecord := rl.LeaderElectionRecord{
|
||||
HolderIdentity: le.config.Lock.Identity(),
|
||||
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
|
||||
RenewTime: now,
|
||||
AcquireTime: now,
|
||||
}
|
||||
|
||||
e, err := le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name)
|
||||
// 1. obtain or create the ElectionRecord
|
||||
oldLeaderElectionRecord, err := le.config.Lock.Get()
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
glog.Errorf("error retrieving endpoint: %v", err)
|
||||
glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
|
||||
return false
|
||||
}
|
||||
|
||||
leaderElectionRecordBytes, err := json.Marshal(leaderElectionRecord)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: le.config.EndpointsMeta.Name,
|
||||
Namespace: le.config.EndpointsMeta.Namespace,
|
||||
Annotations: map[string]string{
|
||||
LeaderElectionRecordAnnotationKey: string(leaderElectionRecordBytes),
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("error initially creating endpoints: %v", err)
|
||||
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
|
||||
glog.Errorf("error initially creating leader election record: %v", err)
|
||||
return false
|
||||
}
|
||||
le.observedRecord = leaderElectionRecord
|
||||
@ -275,46 +237,28 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
if e.Annotations == nil {
|
||||
e.Annotations = make(map[string]string)
|
||||
// 2. Record obtained, check the Identity & Time
|
||||
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
|
||||
le.observedRecord = *oldLeaderElectionRecord
|
||||
le.observedTime = time.Now()
|
||||
}
|
||||
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
||||
oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() {
|
||||
glog.Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
|
||||
return false
|
||||
}
|
||||
|
||||
var oldLeaderElectionRecord LeaderElectionRecord
|
||||
|
||||
if oldLeaderElectionRecordBytes, found := e.Annotations[LeaderElectionRecordAnnotationKey]; found {
|
||||
if err := json.Unmarshal([]byte(oldLeaderElectionRecordBytes), &oldLeaderElectionRecord); err != nil {
|
||||
glog.Errorf("error unmarshaling leader election record: %v", err)
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(le.observedRecord, oldLeaderElectionRecord) {
|
||||
le.observedRecord = oldLeaderElectionRecord
|
||||
le.observedTime = time.Now()
|
||||
}
|
||||
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
||||
oldLeaderElectionRecord.HolderIdentity != le.config.Identity {
|
||||
glog.Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// We're going to try to update. The leaderElectionRecord is set to it's default
|
||||
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
|
||||
// here. Let's correct it before updating.
|
||||
if oldLeaderElectionRecord.HolderIdentity == le.config.Identity {
|
||||
if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() {
|
||||
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
|
||||
} else {
|
||||
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
|
||||
}
|
||||
|
||||
leaderElectionRecordBytes, err := json.Marshal(leaderElectionRecord)
|
||||
if err != nil {
|
||||
glog.Errorf("err marshaling leader election record: %v", err)
|
||||
return false
|
||||
}
|
||||
e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes)
|
||||
|
||||
_, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Update(e)
|
||||
if err != nil {
|
||||
glog.Errorf("err: %v", err)
|
||||
// update the lock itself
|
||||
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
|
||||
glog.Errorf("Failed to update lock: %v", err)
|
||||
return false
|
||||
}
|
||||
le.observedRecord = leaderElectionRecord
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||
rl "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
@ -40,7 +41,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
past := time.Now().Add(-1000 * time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
observedRecord LeaderElectionRecord
|
||||
observedRecord rl.LeaderElectionRecord
|
||||
observedTime time.Time
|
||||
reactors []struct {
|
||||
verb string
|
||||
@ -116,7 +117,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
Namespace: action.GetNamespace(),
|
||||
Name: action.(core.GetAction).GetName(),
|
||||
Annotations: map[string]string{
|
||||
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
|
||||
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
@ -129,7 +130,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
observedRecord: LeaderElectionRecord{HolderIdentity: "bing"},
|
||||
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
|
||||
observedTime: past,
|
||||
|
||||
expectSuccess: true,
|
||||
@ -150,7 +151,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
Namespace: action.GetNamespace(),
|
||||
Name: action.(core.GetAction).GetName(),
|
||||
Annotations: map[string]string{
|
||||
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
|
||||
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
@ -176,7 +177,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
Namespace: action.GetNamespace(),
|
||||
Name: action.(core.GetAction).GetName(),
|
||||
Annotations: map[string]string{
|
||||
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`,
|
||||
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
@ -190,7 +191,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
},
|
||||
},
|
||||
observedTime: future,
|
||||
observedRecord: LeaderElectionRecord{HolderIdentity: "baz"},
|
||||
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
|
||||
|
||||
expectSuccess: true,
|
||||
outHolder: "baz",
|
||||
@ -203,10 +204,16 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
wg.Add(1)
|
||||
var reportedLeader string
|
||||
|
||||
lec := LeaderElectionConfig{
|
||||
lock := rl.EndpointsLock{
|
||||
EndpointsMeta: api.ObjectMeta{Namespace: "foo", Name: "bar"},
|
||||
Identity: "baz",
|
||||
EventRecorder: &record.FakeRecorder{},
|
||||
LockConfig: rl.ResourceLockConfig{
|
||||
Identity: "baz",
|
||||
EventRecorder: &record.FakeRecorder{},
|
||||
},
|
||||
}
|
||||
|
||||
lec := LeaderElectionConfig{
|
||||
Lock: &lock,
|
||||
LeaseDuration: 10 * time.Second,
|
||||
Callbacks: LeaderCallbacks{
|
||||
OnNewLeader: func(l string) {
|
||||
@ -229,7 +236,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
|
||||
observedRecord: test.observedRecord,
|
||||
observedTime: test.observedTime,
|
||||
}
|
||||
le.config.Client = c
|
||||
lock.Client = c
|
||||
|
||||
if test.expectSuccess != le.tryAcquireOrRenew() {
|
||||
t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess)
|
||||
|
102
pkg/client/leaderelection/resourcelock/endpointslock.go
Normal file
102
pkg/client/leaderelection/resourcelock/endpointslock.go
Normal file
@ -0,0 +1,102 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resourcelock
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
)
|
||||
|
||||
type EndpointsLock struct {
|
||||
// EndpointsMeta should contain a Name and a Namespace of an
|
||||
// Endpoints object that the LeaderElector will attempt to lead.
|
||||
EndpointsMeta api.ObjectMeta
|
||||
Client clientset.Interface
|
||||
LockConfig ResourceLockConfig
|
||||
e *api.Endpoints
|
||||
}
|
||||
|
||||
func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) {
|
||||
var record LeaderElectionRecord
|
||||
var err error
|
||||
el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if el.e.Annotations == nil {
|
||||
el.e.Annotations = make(map[string]string)
|
||||
}
|
||||
if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {
|
||||
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &record, nil
|
||||
}
|
||||
|
||||
// Create attempts to create a LeaderElectionRecord annotation
|
||||
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
|
||||
recordBytes, err := json.Marshal(ler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Create(&api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: el.EndpointsMeta.Name,
|
||||
Namespace: el.EndpointsMeta.Namespace,
|
||||
Annotations: map[string]string{
|
||||
LeaderElectionRecordAnnotationKey: string(recordBytes),
|
||||
},
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Update will update and existing annotation on a given resource.
|
||||
func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
|
||||
if el.e == nil {
|
||||
return errors.New("endpoint not initialized, call get or create first")
|
||||
}
|
||||
recordBytes, err := json.Marshal(ler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
|
||||
el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Update(el.e)
|
||||
return err
|
||||
}
|
||||
|
||||
// RecordEvent in leader election while adding meta-data
|
||||
func (el *EndpointsLock) RecordEvent(s string) {
|
||||
events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s)
|
||||
el.LockConfig.EventRecorder.Eventf(&api.Endpoints{ObjectMeta: el.e.ObjectMeta}, api.EventTypeNormal, "LeaderElection", events)
|
||||
}
|
||||
|
||||
// Describe is used to convert details on current resource lock
|
||||
// into a string
|
||||
func (el *EndpointsLock) Describe() string {
|
||||
return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name)
|
||||
}
|
||||
|
||||
// returns the Identity of the lock
|
||||
func (el *EndpointsLock) Identity() string {
|
||||
return el.LockConfig.Identity
|
||||
}
|
71
pkg/client/leaderelection/resourcelock/interface.go
Normal file
71
pkg/client/leaderelection/resourcelock/interface.go
Normal file
@ -0,0 +1,71 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resourcelock
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
)
|
||||
|
||||
const (
|
||||
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
|
||||
)
|
||||
|
||||
// LeaderElectionRecord is the record that is stored in the leader election annotation.
|
||||
// This information should be used for observational purposes only and could be replaced
|
||||
// with a random string (e.g. UUID) with only slight modification of this code.
|
||||
// TODO(mikedanese): this should potentially be versioned
|
||||
type LeaderElectionRecord struct {
|
||||
HolderIdentity string `json:"holderIdentity"`
|
||||
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
|
||||
AcquireTime unversioned.Time `json:"acquireTime"`
|
||||
RenewTime unversioned.Time `json:"renewTime"`
|
||||
LeaderTransitions int `json:"leaderTransitions"`
|
||||
}
|
||||
|
||||
// ResourceLockConfig common data that exists across different
|
||||
// resource locks
|
||||
type ResourceLockConfig struct {
|
||||
Identity string
|
||||
EventRecorder record.EventRecorder
|
||||
}
|
||||
|
||||
// Interface offers a common interface for locking on arbitrary
|
||||
// resources used in leader election. The Interface is used
|
||||
// to hide the details on specific implementations in order to allow
|
||||
// them to change over time. This interface is strictly for use
|
||||
// by the leaderelection code.
|
||||
type Interface interface {
|
||||
// Get returns the LeaderElectionRecord
|
||||
Get() (*LeaderElectionRecord, error)
|
||||
|
||||
// Create attempts to create a LeaderElectionRecord
|
||||
Create(ler LeaderElectionRecord) error
|
||||
|
||||
// Update will update and existing LeaderElectionRecord
|
||||
Update(ler LeaderElectionRecord) error
|
||||
|
||||
// RecordEvent is used to record events
|
||||
RecordEvent(string)
|
||||
|
||||
// Identity will return the locks Identity
|
||||
Identity() string
|
||||
|
||||
// Describe is used to convert details on current resource lock
|
||||
// into a string
|
||||
Describe() string
|
||||
}
|
@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
@ -145,14 +146,21 @@ func Run(s *options.SchedulerServer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
||||
// TODO: enable other lock types
|
||||
rl := resourcelock.EndpointsLock{
|
||||
EndpointsMeta: api.ObjectMeta{
|
||||
Namespace: "kube-system",
|
||||
Name: "kube-scheduler",
|
||||
},
|
||||
Client: leaderElectionClient,
|
||||
Identity: id,
|
||||
EventRecorder: config.Recorder,
|
||||
Client: leaderElectionClient,
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: id,
|
||||
EventRecorder: config.Recorder,
|
||||
},
|
||||
}
|
||||
|
||||
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
||||
Lock: &rl,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
|
||||
|
Loading…
Reference in New Issue
Block a user