Merge pull request #19356 from mikedanese/le-2

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-01-12 21:33:34 -08:00
commit a47c170377
2 changed files with 44 additions and 8 deletions

View File

@ -121,12 +121,15 @@ type LeaderElectionConfig struct {
//
// possible future callbacks:
// * OnChallenge()
// * OnNewLeader()
type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading
OnStartedLeading func(stop <-chan struct{})
// OnStoppedLeading is called when a LeaderElector client stops leading
OnStoppedLeading func()
// OnNewLeader is called when the client observes a leader that is
// not the previously observed leader. This includes the first observed
// leader when the client starts.
OnNewLeader func(identity string)
}
// LeaderElector is a leader election client.
@ -139,6 +142,10 @@ type LeaderElector struct {
// internal bookkeeping
observedRecord LeaderElectionRecord
observedTime time.Time
// used to implement OnNewLeader(), may lag slightly from the
// value observedRecord.HolderIdentity if the transistion has
// not yet been reported.
reportedLeader string
}
// LeaderElectionRecord is the record that is stored in the leader election annotation.
@ -182,6 +189,7 @@ func (le *LeaderElector) acquire() {
stop := make(chan struct{})
util.Until(func() {
succeeded := le.tryAcquireOrRenew()
le.maybeReportTransition()
if !succeeded {
glog.V(4).Infof("failed to renew lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
time.Sleep(wait.Jitter(le.config.RetryPeriod, JitterFactor))
@ -200,6 +208,7 @@ func (le *LeaderElector) renew() {
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
})
le.maybeReportTransition()
if err == nil {
glog.V(4).Infof("succesfully renewed lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
return
@ -296,3 +305,13 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
le.observedTime = time.Now()
return true
}
func (l *LeaderElector) maybeReportTransition() {
if l.observedRecord.HolderIdentity == l.reportedLeader {
return
}
l.reportedLeader = l.observedRecord.HolderIdentity
if l.config.Callbacks.OnNewLeader != nil {
go l.config.Callbacks.OnNewLeader(l.reportedLeader)
}
}

View File

@ -22,6 +22,7 @@ package leaderelection
import (
"fmt"
"sync"
"testing"
"time"
@ -195,14 +196,24 @@ func TestTryAcquireOrRenew(t *testing.T) {
},
}
lec := LeaderElectionConfig{
EndpointsMeta: api.ObjectMeta{Namespace: "foo", Name: "bar"},
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
LeaseDuration: 10 * time.Second,
}
for i, test := range tests {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
lec := LeaderElectionConfig{
EndpointsMeta: api.ObjectMeta{Namespace: "foo", Name: "bar"},
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
}
c := &testclient.Fake{}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, "endpoints", reactor.reaction)
@ -237,5 +248,11 @@ func TestTryAcquireOrRenew(t *testing.T) {
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("[%v]leader should not have transitioned but did", i)
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader)
}
}
}