From 441baf07d618a40d11b5c7e9d842f4406e03052e Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Wed, 6 Jan 2016 16:19:01 -0800 Subject: [PATCH] add OnNewLeader callback to leaderelection client --- pkg/client/leaderelection/leaderelection.go | 21 ++++++++++++- .../leaderelection/leaderelection_test.go | 31 ++++++++++++++----- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/pkg/client/leaderelection/leaderelection.go b/pkg/client/leaderelection/leaderelection.go index 4ca7feca98d..cdd29a54039 100644 --- a/pkg/client/leaderelection/leaderelection.go +++ b/pkg/client/leaderelection/leaderelection.go @@ -121,12 +121,15 @@ type LeaderElectionConfig struct { // // possible future callbacks: // * OnChallenge() -// * OnNewLeader() type LeaderCallbacks struct { // OnStartedLeading is called when a LeaderElector client starts leading OnStartedLeading func(stop <-chan struct{}) // OnStoppedLeading is called when a LeaderElector client stops leading OnStoppedLeading func() + // OnNewLeader is called when the client observes a leader that is + // not the previously observed leader. This includes the first observed + // leader when the client starts. + OnNewLeader func(identity string) } // LeaderElector is a leader election client. @@ -139,6 +142,10 @@ type LeaderElector struct { // internal bookkeeping observedRecord LeaderElectionRecord observedTime time.Time + // used to implement OnNewLeader(), may lag slightly from the + // value observedRecord.HolderIdentity if the transistion has + // not yet been reported. + reportedLeader string } // LeaderElectionRecord is the record that is stored in the leader election annotation. @@ -182,6 +189,7 @@ func (le *LeaderElector) acquire() { stop := make(chan struct{}) util.Until(func() { succeeded := le.tryAcquireOrRenew() + le.maybeReportTransition() if !succeeded { glog.V(4).Infof("failed to renew lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name) time.Sleep(wait.Jitter(le.config.RetryPeriod, JitterFactor)) @@ -200,6 +208,7 @@ func (le *LeaderElector) renew() { err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { return le.tryAcquireOrRenew(), nil }) + le.maybeReportTransition() if err == nil { glog.V(4).Infof("succesfully renewed lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name) return @@ -296,3 +305,13 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { le.observedTime = time.Now() return true } + +func (l *LeaderElector) maybeReportTransition() { + if l.observedRecord.HolderIdentity == l.reportedLeader { + return + } + l.reportedLeader = l.observedRecord.HolderIdentity + if l.config.Callbacks.OnNewLeader != nil { + go l.config.Callbacks.OnNewLeader(l.reportedLeader) + } +} diff --git a/pkg/client/leaderelection/leaderelection_test.go b/pkg/client/leaderelection/leaderelection_test.go index 63f38c1e6d6..cd880f73e1a 100644 --- a/pkg/client/leaderelection/leaderelection_test.go +++ b/pkg/client/leaderelection/leaderelection_test.go @@ -22,6 +22,7 @@ package leaderelection import ( "fmt" + "sync" "testing" "time" @@ -195,14 +196,24 @@ func TestTryAcquireOrRenew(t *testing.T) { }, } - lec := LeaderElectionConfig{ - EndpointsMeta: api.ObjectMeta{Namespace: "foo", Name: "bar"}, - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, - LeaseDuration: 10 * time.Second, - } - for i, test := range tests { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + + lec := LeaderElectionConfig{ + EndpointsMeta: api.ObjectMeta{Namespace: "foo", Name: "bar"}, + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, + }, + } c := &testclient.Fake{} for _, reactor := range test.reactors { c.AddReactor(reactor.verb, "endpoints", reactor.reaction) @@ -237,5 +248,11 @@ func TestTryAcquireOrRenew(t *testing.T) { if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { t.Errorf("[%v]leader should not have transitioned but did", i) } + + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) + } } }