diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 807ba85abc9..9290903df44 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -78,10 +78,10 @@ type MesosScheduler struct { executor *mesos.ExecutorInfo executorGroup uint64 client *client.Client - etcdClient tools.EtcdClient failoverTimeout float64 // in seconds reconcileInterval int64 nodeRegistrator node.Registrator + storeFrameworkId func(id string) // Mesos context. @@ -112,7 +112,7 @@ type Config struct { Executor *mesos.ExecutorInfo PodScheduler podschedulers.PodScheduler Client *client.Client - EtcdClient tools.EtcdClient + StoreFrameworkId func(id string) FailoverTimeout float64 ReconcileInterval int64 ReconcileCooldown time.Duration @@ -129,7 +129,6 @@ func New(config Config) *MesosScheduler { executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), podScheduler: config.PodScheduler, client: config.Client, - etcdClient: config.EtcdClient, failoverTimeout: config.FailoverTimeout, reconcileInterval: config.ReconcileInterval, nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode), @@ -174,6 +173,7 @@ func New(config Config) *MesosScheduler { asRegisteredMaster: proc.DoerFunc(func(proc.Action) <-chan error { return proc.ErrorChanf("cannot execute action with unregistered scheduler") }), + storeFrameworkId: config.StoreFrameworkId, } return k } @@ -266,14 +266,6 @@ func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.Fra k.reconciler.RequestExplicit() } -func (k *MesosScheduler) storeFrameworkId() { - // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available - _, err := k.etcdClient.Set(meta.FrameworkIDKey, k.frameworkId.GetValue(), uint64(k.failoverTimeout)) - if err != nil { - log.Errorf("failed to renew frameworkId TTL: %v", err) - } -} - // Reregistered is called when the scheduler re-registered with the master successfully. // This happends when the master fails over. func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { @@ -296,7 +288,9 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() { refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second } - go runtime.Until(k.storeFrameworkId, refreshInterval, k.terminate) + go runtime.Until(func() { + k.storeFrameworkId(k.frameworkId.GetValue()) + }, refreshInterval, k.terminate) } r1 := k.makeTaskRegistryReconciler() diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index a8b46a0e47b..6c162edfdaf 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -55,11 +55,11 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" @@ -723,11 +723,17 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config Executor: executor, PodScheduler: fcfs, Client: client, - EtcdClient: etcdClient, FailoverTimeout: s.FailoverTimeout, ReconcileInterval: s.ReconcileInterval, ReconcileCooldown: s.ReconcileCooldown, LookupNode: lookupNode, + StoreFrameworkId: func(id string) { + // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available + _, err := etcdClient.Set(meta.FrameworkIDKey, id, uint64(s.FailoverTimeout)) + if err != nil { + log.Errorf("failed to renew frameworkId TTL: %v", err) + } + }, }) masterUri := s.MesosMaster