Move etcd code from MesosScheduler into service

This commit is contained in:
Dr. Stefan Schimanski 2015-10-26 21:10:27 -05:00
parent d4675f1dc3
commit 23fa56adb1
2 changed files with 14 additions and 14 deletions

View File

@ -78,10 +78,10 @@ type MesosScheduler struct {
executor *mesos.ExecutorInfo executor *mesos.ExecutorInfo
executorGroup uint64 executorGroup uint64
client *client.Client client *client.Client
etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds failoverTimeout float64 // in seconds
reconcileInterval int64 reconcileInterval int64
nodeRegistrator node.Registrator nodeRegistrator node.Registrator
storeFrameworkId func(id string)
// Mesos context. // Mesos context.
@ -112,7 +112,7 @@ type Config struct {
Executor *mesos.ExecutorInfo Executor *mesos.ExecutorInfo
PodScheduler podschedulers.PodScheduler PodScheduler podschedulers.PodScheduler
Client *client.Client Client *client.Client
EtcdClient tools.EtcdClient StoreFrameworkId func(id string)
FailoverTimeout float64 FailoverTimeout float64
ReconcileInterval int64 ReconcileInterval int64
ReconcileCooldown time.Duration ReconcileCooldown time.Duration
@ -129,7 +129,6 @@ func New(config Config) *MesosScheduler {
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
podScheduler: config.PodScheduler, podScheduler: config.PodScheduler,
client: config.Client, client: config.Client,
etcdClient: config.EtcdClient,
failoverTimeout: config.FailoverTimeout, failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval, reconcileInterval: config.ReconcileInterval,
nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode), nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode),
@ -174,6 +173,7 @@ func New(config Config) *MesosScheduler {
asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error { asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error {
return proc.ErrorChanf("cannot execute action with unregistered scheduler") return proc.ErrorChanf("cannot execute action with unregistered scheduler")
}), }),
storeFrameworkId: config.StoreFrameworkId,
} }
return k return k
} }
@ -266,14 +266,6 @@ func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.Fra
k.reconciler.RequestExplicit() k.reconciler.RequestExplicit()
} }
func (k *MesosScheduler) storeFrameworkId() {
// TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available
_, err := k.etcdClient.Set(meta.FrameworkIDKey, k.frameworkId.GetValue(), uint64(k.failoverTimeout))
if err != nil {
log.Errorf("failed to renew frameworkId TTL: %v", err)
}
}
// Reregistered is called when the scheduler re-registered with the master successfully. // Reregistered is called when the scheduler re-registered with the master successfully.
// This happends when the master fails over. // This happends when the master fails over.
func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) {
@ -296,7 +288,9 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver)
if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() { if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() {
refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second
} }
go runtime.Until(k.storeFrameworkId, refreshInterval, k.terminate) go runtime.Until(func() {
k.storeFrameworkId(k.frameworkId.GetValue())
}, refreshInterval, k.terminate)
} }
r1 := k.makeTaskRegistryReconciler() r1 := k.makeTaskRegistryReconciler()

View File

@ -55,11 +55,11 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/profile"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
@ -723,11 +723,17 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
Executor: executor, Executor: executor,
PodScheduler: fcfs, PodScheduler: fcfs,
Client: client, Client: client,
EtcdClient: etcdClient,
FailoverTimeout: s.FailoverTimeout, FailoverTimeout: s.FailoverTimeout,
ReconcileInterval: s.ReconcileInterval, ReconcileInterval: s.ReconcileInterval,
ReconcileCooldown: s.ReconcileCooldown, ReconcileCooldown: s.ReconcileCooldown,
LookupNode: lookupNode, LookupNode: lookupNode,
StoreFrameworkId: func(id string) {
// TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available
_, err := etcdClient.Set(meta.FrameworkIDKey, id, uint64(s.FailoverTimeout))
if err != nil {
log.Errorf("failed to renew frameworkId TTL: %v", err)
}
},
}) })
masterUri := s.MesosMaster masterUri := s.MesosMaster