1
0
mirror of https://github.com/rancher/norman.git synced 2025-09-02 15:54:32 +00:00

Add leader election wrapper

This commit is contained in:
Darren Shepherd
2018-02-20 12:49:17 -07:00
parent 193c79e3fb
commit 3985a73858

77
leader/leader.go Normal file
View File

@@ -0,0 +1,77 @@
package leader
import (
"context"
"os"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
)
type Callback func(cb context.Context)
func RunOrDie(ctx context.Context, name string, client kubernetes.Interface, cb Callback) {
err := run(ctx, name, client, cb)
if err != nil {
logrus.Fatalf("Failed to start leader election for %s", name)
}
panic("Failed to start leader election for " + name)
}
func run(ctx context.Context, name string, client kubernetes.Interface, cb Callback) error {
id, err := os.Hostname()
if err != nil {
return err
}
le := leaderelectionconfig.DefaultLeaderElectionConfiguration()
le.LeaderElect = true
le.ResourceLock = resourcelock.ConfigMapsResourceLock
recorder := createRecorder(name, client)
rl, err := resourcelock.New(le.ResourceLock,
"kube-system",
name,
client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
logrus.Fatalf("error creating leader lock for %s: %v", name, err)
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: le.LeaseDuration.Duration,
RenewDeadline: le.RenewDeadline.Duration,
RetryPeriod: le.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(stop <-chan struct{}) {
subCtx, cancel := context.WithCancel(ctx)
go cb(subCtx)
<-stop
cancel()
},
OnStoppedLeading: func() {
logrus.Fatalf("leaderelection lost for %s", name)
},
},
})
panic("unreachable")
}
func createRecorder(name string, kubeClient kubernetes.Interface) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
return eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: name})
}