retrofit the scheduler with the leader election client.

Signed-off-by: Mike Danese <mikedanese@google.com>
This commit is contained in:
Mike Danese
2016-01-06 13:02:34 -08:00
parent a47c170377
commit f71657d9a6
5 changed files with 107 additions and 5 deletions

View File

@@ -21,6 +21,7 @@ import (
"net"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
@@ -41,6 +42,7 @@ type SchedulerServer struct {
KubeAPIQPS float32
KubeAPIBurst int
SchedulerName string
LeaderElection leaderelection.LeaderElectionCLIConfig
}
// NewSchedulerServer creates a new SchedulerServer with default parameters
@@ -54,6 +56,7 @@ func NewSchedulerServer() *SchedulerServer {
KubeAPIQPS: 50.0,
KubeAPIBurst: 100,
SchedulerName: api.DefaultSchedulerName,
LeaderElection: leaderelection.DefaultLeaderElectionCLIConfig(),
}
return &s
}
@@ -72,4 +75,5 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
s.LeaderElection.BindFlags(fs)
}

View File

@@ -27,6 +27,7 @@ import (
"strconv"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
@@ -110,9 +111,44 @@ func Run(s *options.SchedulerServer) error {
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
sched := scheduler.New(config)
sched.Run()
select {}
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}
if !s.LeaderElection.LeaderElect {
run(nil)
glog.Fatal("this statement is unreachable")
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-scheduler",
},
Client: kubeClient,
Identity: id,
EventRecorder: config.Recorder,
LeaseDuration: s.LeaderElection.LeaseDuration,
RenewDeadline: s.LeaderElection.RenewDeadline,
RetryPeriod: s.LeaderElection.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("lost master")
},
},
})
glog.Fatal("this statement is unreachable")
panic("unreachable")
}
func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) {