diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration_test.go index 4045c5ebe8b..4a6c6fe072a 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration_test.go @@ -427,6 +427,7 @@ type lifecycleTest struct { driver *mmock.JoinableDriver eventObs *EventObserver loop operations.SchedulerLoopInterface + podReconciler *operations.PodReconciler podsListWatch *MockPodsListWatch scheduler *MesosScheduler schedulerProc *ha.SchedulerProcess @@ -485,7 +486,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest { // create scheduler loop fw := &MesosFramework{MesosScheduler: mesosScheduler} eventObs := NewEventObserver() - loop := operations.NewSchedulerLoop(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) + loop, _ := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch) assert.NotNil(loop) // create mock mesos scheduler driver @@ -510,7 +511,7 @@ func (lt lifecycleTest) Start() <-chan LaunchedTask { // init scheduler err := lt.scheduler.Init( lt.schedulerProc.Master(), - lt.loop, + lt.podReconciler, http.DefaultServeMux, ) assert.NoError(err) diff --git a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go index 486a6691468..450240b3847 100644 --- a/contrib/mesos/pkg/scheduler/operations/schedulerloop.go +++ b/contrib/mesos/pkg/scheduler/operations/schedulerloop.go @@ -42,9 +42,6 @@ const ( ) type SchedulerLoopInterface interface { - ReconcilePodTask(t *podtask.T) - - // execute the Scheduling plugin, should start a go routine and return immediately Run(<-chan struct{}) } @@ -55,12 +52,11 @@ type SchedulerLoop struct { error func(*api.Pod, error) recorder record.EventRecorder client *client.Client - pr *PodReconciler - starting chan struct{} // startup latch + started chan<- struct{} // startup latch } -func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder, - terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *SchedulerLoop { +func NewScheduler(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder, + terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) (SchedulerLoopInterface, *PodReconciler) { // Watch and queue pods that need scheduling. updates := make(chan queue.Entry, c.UpdatesBacklog) @@ -74,10 +70,10 @@ func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Clien q := queuer.New(podUpdates) podDeleter := NewDeleter(fw, q) podReconciler := NewPodReconciler(fw, client, q, podDeleter) - bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) - eh := NewErrorHandler(fw, bo, q) + startLatch := make(chan struct{}) eventBroadcaster := record.NewBroadcaster() + runtime.On(startLatch, func() { eventBroadcaster.StartRecordingToSink(client.Events("")) reflector.Run() // TODO(jdef) should listen for termination @@ -87,20 +83,27 @@ func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Clien q.InstallDebugHandlers(mux) podtask.InstallDebugHandlers(fw.Tasks(), mux) }) + + return NewSchedulerLoop(c, fw, client, recorder, podUpdates, q, startLatch), podReconciler +} + +func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Client, + recorder record.EventRecorder, podUpdates queue.FIFO, q *queuer.Queuer, + started chan<- struct{}) *SchedulerLoop { + bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) return &SchedulerLoop{ algorithm: NewSchedulerAlgorithm(fw, podUpdates), binder: NewBinder(fw), nextPod: q.Yield, - error: eh.Error, + error: NewErrorHandler(fw, bo, q).Error, recorder: recorder, client: client, - pr: podReconciler, - starting: startLatch, + started: started, } } func (s *SchedulerLoop) Run(done <-chan struct{}) { - defer close(s.starting) + defer close(s.started) go runtime.Until(s.scheduleOne, recoveryDelay, done) } @@ -141,7 +144,3 @@ func (s *SchedulerLoop) scheduleOne() { } s.recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest) } - -func (s *SchedulerLoop) ReconcilePodTask(t *podtask.T) { - s.pr.Reconcile(t) -} diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 4359cb05a54..03dde8a4239 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -56,8 +56,7 @@ import ( // KubernetesScheduler implements: // 1: A mesos scheduler. -// 2: A kubernetes scheduler plugin. -// 3: A kubernetes pod.Registry. +// 2: A kubernetes pod.Registry. type MesosScheduler struct { // We use a lock here to avoid races // between invoking the mesos callback @@ -93,9 +92,8 @@ type MesosScheduler struct { taskRegistry podtask.Registry // via deferred init - - loop operations.SchedulerLoopInterface - reconciler *operations.TasksReconciler + podReconciler *operations.PodReconciler + tasksReconciler *operations.TasksReconciler reconcileCooldown time.Duration asRegisteredMaster proc.Doer terminate <-chan struct{} // signal chan, closes when we should kill background tasks @@ -172,7 +170,7 @@ func New(config Config) *MesosScheduler { return k } -func (k *MesosScheduler) Init(electedMaster proc.Process, sl operations.SchedulerLoopInterface, mux *http.ServeMux) error { +func (k *MesosScheduler) Init(electedMaster proc.Process, pr *operations.PodReconciler, mux *http.ServeMux) error { log.V(1).Infoln("initializing kubernetes mesos scheduler") k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { @@ -182,7 +180,7 @@ func (k *MesosScheduler) Init(electedMaster proc.Process, sl operations.Schedule return electedMaster.Do(a) }) k.terminate = electedMaster.Done() - k.loop = sl + k.podReconciler = pr k.offers.Init(k.terminate) k.InstallDebugHandlers(mux) k.nodeRegistrator.Run(k.terminate) @@ -223,8 +221,8 @@ func (k *MesosScheduler) InstallDebugHandlers(mux *http.ServeMux) { w.WriteHeader(http.StatusNoContent) })) } - requestReconciliation("/debug/actions/requestExplicit", k.reconciler.RequestExplicit) - requestReconciliation("/debug/actions/requestImplicit", k.reconciler.RequestImplicit) + requestReconciliation("/debug/actions/requestExplicit", k.tasksReconciler.RequestExplicit) + requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit) wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { slaves := k.slaveHostNames.SlaveIDs() @@ -257,7 +255,7 @@ func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.Fra k.registered = true k.onRegistration.Do(func() { k.onInitialRegistration(drv) }) - k.reconciler.RequestExplicit() + k.tasksReconciler.RequestExplicit() } // Reregistered is called when the scheduler re-registered with the master successfully. @@ -270,7 +268,7 @@ func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.Ma k.registered = true k.onRegistration.Do(func() { k.onInitialRegistration(drv) }) - k.reconciler.RequestExplicit() + k.tasksReconciler.RequestExplicit() } // perform one-time initialization actions upon the first registration event received from Mesos. @@ -290,13 +288,13 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver) r1 := k.makeTaskRegistryReconciler() r2 := k.makePodRegistryReconciler() - k.reconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), + k.tasksReconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) - go k.reconciler.Run(driver) + go k.tasksReconciler.Run(driver) if k.reconcileInterval > 0 { ri := time.Duration(k.reconcileInterval) * time.Second - time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.reconciler.RequestImplicit, ri, k.terminate) }) + time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) }) log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration) } } @@ -392,7 +390,7 @@ func (k *MesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatu case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR: if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil { if task.Has(podtask.Launched) && !task.Has(podtask.Bound) { - go k.loop.ReconcilePodTask(task) + go k.podReconciler.Reconcile(task) return } } else { @@ -440,7 +438,7 @@ func (k *MesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver, } else if taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED || taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED { // attempt to prevent dangling pods in the pod and task registries log.V(1).Infof("request explicit reconciliation to clean up for task %v after executor reported (terminated/unregistered)", taskStatus.TaskId.GetValue()) - k.reconciler.RequestExplicit() + k.tasksReconciler.RequestExplicit() } else if taskStatus.GetState() == mesos.TaskState_TASK_LOST && state == podtask.StateRunning && taskStatus.ExecutorId != nil && taskStatus.SlaveId != nil { //TODO(jdef) this may not be meaningful once we have proper checkpointing and master detection //If we're reconciling and receive this then the executor may be diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 47c4f3cde56..43a3d30e90a 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -765,14 +765,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything()) - loop := operations.NewSchedulerLoop(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw) + loop, pr := operations.NewScheduler(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw) runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) }) runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal())) driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) { log.V(1).Infoln("performing deferred initialization") - if err = mesosScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil { + if err = mesosScheduler.Init(schedulerProcess.Master(), pr, s.mux); err != nil { return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err) } log.V(1).Infoln("deferred init complete")