2018-02-20 12:49:17 -07:00
|
|
|
package leader
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2018-05-16 10:08:27 -07:00
|
|
|
"os"
|
2018-05-10 14:30:44 -07:00
|
|
|
"sync"
|
2018-02-20 12:49:17 -07:00
|
|
|
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"k8s.io/api/core/v1"
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
|
|
"k8s.io/client-go/tools/record"
|
|
|
|
"k8s.io/kubernetes/pkg/api"
|
|
|
|
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
|
|
|
|
)
|
|
|
|
|
2018-05-10 14:30:44 -07:00
|
|
|
type State struct {
|
|
|
|
sync.Mutex
|
|
|
|
identity string
|
|
|
|
leader bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *State) Get() (string, bool) {
|
|
|
|
l.Lock()
|
|
|
|
defer l.Unlock()
|
|
|
|
return l.identity, l.leader
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *State) Status(identity string, leader bool) {
|
|
|
|
l.Lock()
|
|
|
|
l.identity = identity
|
|
|
|
l.leader = leader
|
|
|
|
l.Unlock()
|
|
|
|
}
|
|
|
|
|
2018-02-20 12:49:17 -07:00
|
|
|
type Callback func(cb context.Context)
|
2018-05-10 14:30:44 -07:00
|
|
|
type StatusCallback func(identity string, leader bool)
|
2018-02-20 12:49:17 -07:00
|
|
|
|
2018-05-16 10:08:27 -07:00
|
|
|
func RunOrDie(ctx context.Context, name string, client kubernetes.Interface, cb Callback, status StatusCallback) {
|
|
|
|
err := run(ctx, name, client, cb, status)
|
2018-02-20 12:49:17 -07:00
|
|
|
if err != nil {
|
|
|
|
logrus.Fatalf("Failed to start leader election for %s", name)
|
|
|
|
}
|
|
|
|
panic("Failed to start leader election for " + name)
|
|
|
|
}
|
|
|
|
|
2018-05-16 10:08:27 -07:00
|
|
|
func run(ctx context.Context, name string, client kubernetes.Interface, cb Callback, status StatusCallback) error {
|
|
|
|
id, err := os.Hostname()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-02-20 12:49:17 -07:00
|
|
|
|
|
|
|
le := leaderelectionconfig.DefaultLeaderElectionConfiguration()
|
|
|
|
le.LeaderElect = true
|
|
|
|
le.ResourceLock = resourcelock.ConfigMapsResourceLock
|
|
|
|
|
|
|
|
recorder := createRecorder(name, client)
|
|
|
|
|
|
|
|
rl, err := resourcelock.New(le.ResourceLock,
|
|
|
|
"kube-system",
|
|
|
|
name,
|
|
|
|
client.CoreV1(),
|
|
|
|
resourcelock.ResourceLockConfig{
|
|
|
|
Identity: id,
|
|
|
|
EventRecorder: recorder,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
logrus.Fatalf("error creating leader lock for %s: %v", name, err)
|
|
|
|
}
|
|
|
|
|
2018-05-10 14:30:44 -07:00
|
|
|
status(id, false)
|
|
|
|
|
2018-02-20 12:49:17 -07:00
|
|
|
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
|
|
|
Lock: rl,
|
|
|
|
LeaseDuration: le.LeaseDuration.Duration,
|
|
|
|
RenewDeadline: le.RenewDeadline.Duration,
|
|
|
|
RetryPeriod: le.RetryPeriod.Duration,
|
|
|
|
Callbacks: leaderelection.LeaderCallbacks{
|
|
|
|
OnStartedLeading: func(stop <-chan struct{}) {
|
|
|
|
subCtx, cancel := context.WithCancel(ctx)
|
|
|
|
go cb(subCtx)
|
2018-05-10 14:30:44 -07:00
|
|
|
status(id, true)
|
2018-02-20 12:49:17 -07:00
|
|
|
<-stop
|
|
|
|
cancel()
|
|
|
|
},
|
|
|
|
OnStoppedLeading: func() {
|
|
|
|
logrus.Fatalf("leaderelection lost for %s", name)
|
|
|
|
},
|
2018-05-10 14:30:44 -07:00
|
|
|
OnNewLeader: func(identity string) {
|
|
|
|
status(identity, identity == id)
|
|
|
|
},
|
2018-02-20 12:49:17 -07:00
|
|
|
},
|
|
|
|
})
|
|
|
|
panic("unreachable")
|
|
|
|
}
|
|
|
|
|
|
|
|
func createRecorder(name string, kubeClient kubernetes.Interface) record.EventRecorder {
|
|
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
|
|
eventBroadcaster.StartLogging(logrus.Infof)
|
|
|
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
|
|
|
|
return eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: name})
|
|
|
|
}
|