diff --git a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go index ff62bf26064..007457b6e48 100644 --- a/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go +++ b/contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go @@ -30,15 +30,19 @@ import ( "k8s.io/kubernetes/pkg/client/cache" ) +type SchedulerAlgorithm interface { + Schedule(pod *api.Pod) (string, error) +} + // SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface -type SchedulerAlgorithm struct { +type schedulerAlgorithm struct { sched types.Scheduler podUpdates queue.FIFO podScheduler podschedulers.PodScheduler } -func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) *SchedulerAlgorithm { - return &SchedulerAlgorithm{ +func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) SchedulerAlgorithm { + return &schedulerAlgorithm{ sched: sched, podUpdates: podUpdates, podScheduler: podScheduler, @@ -47,7 +51,7 @@ func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podSche // Schedule implements the Scheduler interface of Kubernetes. // It returns the selectedMachine's name and error (if there's any). -func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { +func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { log.Infof("Try to schedule pod %v\n", pod.Name) ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) @@ -105,7 +109,7 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { } // Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on -func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) { +func (k *schedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) { var offer offers.Perishable if task.HasAcceptedOffer() { // verify that the offer is still on the table diff --git a/contrib/mesos/pkg/scheduler/components/binder/binder.go b/contrib/mesos/pkg/scheduler/components/binder/binder.go index 06017216364..81e999b9040 100644 --- a/contrib/mesos/pkg/scheduler/components/binder/binder.go +++ b/contrib/mesos/pkg/scheduler/components/binder/binder.go @@ -28,18 +28,22 @@ import ( "k8s.io/kubernetes/pkg/api" ) -type Binder struct { +type Binder interface { + Bind(binding *api.Binding) error +} + +type binder struct { sched types.Scheduler } -func NewBinder(sched types.Scheduler) *Binder { - return &Binder{ +func NewBinder(sched types.Scheduler) Binder { + return &binder{ sched: sched, } } // implements binding.Registry, launches the pod-associated-task in mesos -func (b *Binder) Bind(binding *api.Binding) error { +func (b *binder) Bind(binding *api.Binding) error { ctx := api.WithNamespace(api.NewContext(), binding.Namespace) @@ -63,7 +67,7 @@ func (b *Binder) Bind(binding *api.Binding) error { } } -func (b *Binder) rollback(task *podtask.T, err error) error { +func (b *binder) rollback(task *podtask.T, err error) error { task.Offer.Release() task.Reset() if err2 := b.sched.Tasks().Update(task); err2 != nil { @@ -78,7 +82,7 @@ func (b *Binder) rollback(task *podtask.T, err error) error { // kubernetes executor on the slave will finally do the binding. This is different from the // upstream scheduler in the sense that the upstream scheduler does the binding and the // kubelet will notice that and launches the pod. -func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { +func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between // Schedule() and now that the offer for this task was rescinded or invalidated. // ((we should never see this here)) @@ -111,7 +115,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e } //TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified -func (b *Binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { +func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { pod := task.Pod // we make an effort here to avoid making changes to the task's copy of the pod, since diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter.go index 19f3f7bfd1e..9e8df2f33ec 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter.go @@ -29,13 +29,18 @@ import ( "k8s.io/kubernetes/pkg/api" ) -type Deleter struct { +type Deleter interface { + Run(updates <-chan queue.Entry, done <-chan struct{}) + DeleteOne(pod *queuer.Pod) error +} + +type deleter struct { sched types.Scheduler qr *queuer.Queuer } -func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) *Deleter { - return &Deleter{ +func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) Deleter { + return &deleter{ sched: sched, qr: qr, } @@ -43,7 +48,7 @@ func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) *Deleter { // currently monitors for "pod deleted" events, upon which handle() // is invoked. -func (k *Deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { +func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { go runtime.Until(func() { for { entry := <-updates @@ -59,7 +64,7 @@ func (k *Deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { }, 1*time.Second, done) } -func (k *Deleter) DeleteOne(pod *queuer.Pod) error { +func (k *deleter) DeleteOne(pod *queuer.Pod) error { ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) podKey, err := podtask.MakePodKey(ctx, pod.Name) if err != nil { diff --git a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go index 50ea1f435ba..403e250c628 100644 --- a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go +++ b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go @@ -30,15 +30,19 @@ import ( "k8s.io/kubernetes/pkg/util" ) -type ErrorHandler struct { +type ErrorHandler interface { + Error(pod *api.Pod, schedulingErr error) +} + +type errorHandler struct { sched types.Scheduler backoff *backoff.Backoff qr *queuer.Queuer podScheduler podschedulers.PodScheduler } -func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) *ErrorHandler { - return &ErrorHandler{ +func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) ErrorHandler { + return &errorHandler{ sched: sched, backoff: backoff, qr: qr, @@ -47,7 +51,7 @@ func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer } // implementation of scheduling plugin's Error func; see plugin/pkg/scheduler -func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) { +func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) { if schedulingErr == merrors.NoSuchPodErr { log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name) diff --git a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go index e5718adb957..dca374d6410 100644 --- a/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go +++ b/contrib/mesos/pkg/scheduler/components/podreconciler/podreconciler.go @@ -31,15 +31,19 @@ import ( ) // PodReconciler reconciles a pod with the apiserver -type PodReconciler struct { +type PodReconciler interface { + Reconcile(t *podtask.T) +} + +type podReconciler struct { sched types.Scheduler client *client.Client qr *queuer.Queuer - deleter *deleter.Deleter + deleter deleter.Deleter } -func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter *deleter.Deleter) *PodReconciler { - return &PodReconciler{ +func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter deleter.Deleter) PodReconciler { + return &podReconciler{ sched: sched, client: client, qr: qr, @@ -58,7 +62,7 @@ func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Q // host="..." | host="..." ; perhaps no updates to process? // // TODO(jdef) this needs an integration test -func (s *PodReconciler) Reconcile(t *podtask.T) { +func (s *podReconciler) Reconcile(t *podtask.T) { log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave) ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace) pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name) diff --git a/contrib/mesos/pkg/scheduler/components/schedulerloop/schedulerloop.go b/contrib/mesos/pkg/scheduler/components/schedulerloop/schedulerloop.go index 3552f60edc5..e4519c0dff4 100644 --- a/contrib/mesos/pkg/scheduler/components/schedulerloop/schedulerloop.go +++ b/contrib/mesos/pkg/scheduler/components/schedulerloop/schedulerloop.go @@ -35,13 +35,13 @@ const ( Scheduled = "Scheduled" ) -type SchedulerLoopInterface interface { +type SchedulerLoop interface { Run(<-chan struct{}) } -type SchedulerLoop struct { - algorithm *algorithm.SchedulerAlgorithm - binder *binder.Binder +type schedulerLoop struct { + algorithm algorithm.SchedulerAlgorithm + binder binder.Binder nextPod func() *api.Pod error func(*api.Pod, error) recorder record.EventRecorder @@ -49,10 +49,10 @@ type SchedulerLoop struct { started chan<- struct{} // startup latch } -func NewSchedulerLoop(client *client.Client, algorithm *algorithm.SchedulerAlgorithm, +func NewSchedulerLoop(client *client.Client, algorithm algorithm.SchedulerAlgorithm, recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error), - binder *binder.Binder, started chan<- struct{}) *SchedulerLoop { - return &SchedulerLoop{ + binder binder.Binder, started chan<- struct{}) SchedulerLoop { + return &schedulerLoop{ algorithm: algorithm, binder: binder, nextPod: nextPod, @@ -63,14 +63,14 @@ func NewSchedulerLoop(client *client.Client, algorithm *algorithm.SchedulerAlgor } } -func (s *SchedulerLoop) Run(done <-chan struct{}) { +func (s *schedulerLoop) Run(done <-chan struct{}) { defer close(s.started) go runtime.Until(s.scheduleOne, recoveryDelay, done) } // hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go, // with the Modeler stuff removed since we don't use it because we have mesos. -func (s *SchedulerLoop) scheduleOne() { +func (s *schedulerLoop) scheduleOne() { pod := s.nextPod() // pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet diff --git a/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go b/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go index 37ed2250cae..9480d60963b 100644 --- a/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go +++ b/contrib/mesos/pkg/scheduler/components/tasksreconciler/tasksreconciler.go @@ -29,25 +29,29 @@ import ( type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error -type TasksReconciler struct { +type TasksReconciler interface { + RequestExplicit() + RequestImplicit() + Run(driver bindings.SchedulerDriver, done <-chan struct{}) +} + +type tasksReconciler struct { proc.Doer Action ReconcilerAction explicit chan struct{} // send an empty struct to trigger explicit reconciliation implicit chan struct{} // send an empty struct to trigger implicit reconciliation - done <-chan struct{} // close this when you want the reconciler to exit cooldown time.Duration explicitReconciliationAbortTimeout time.Duration } func NewTasksReconciler(doer proc.Doer, action ReconcilerAction, - cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *TasksReconciler { - return &TasksReconciler{ + cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) TasksReconciler { + return &tasksReconciler{ Doer: doer, explicit: make(chan struct{}, 1), implicit: make(chan struct{}, 1), cooldown: cooldown, explicitReconciliationAbortTimeout: explicitReconciliationAbortTimeout, - done: done, Action: func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { // trigged the reconciler action in the doer's execution context, // but it could take a while and the scheduler needs to be able to @@ -67,14 +71,14 @@ func NewTasksReconciler(doer proc.Doer, action ReconcilerAction, } } -func (r *TasksReconciler) RequestExplicit() { +func (r *tasksReconciler) RequestExplicit() { select { case r.explicit <- struct{}{}: // noop default: // request queue full; noop } } -func (r *TasksReconciler) RequestImplicit() { +func (r *tasksReconciler) RequestImplicit() { select { case r.implicit <- struct{}{}: // noop default: // request queue full; noop @@ -84,12 +88,12 @@ func (r *TasksReconciler) RequestImplicit() { // execute task reconciliation, returns when r.done is closed. intended to run as a goroutine. // if reconciliation is requested while another is in progress, the in-progress operation will be // cancelled before the new reconciliation operation begins. -func (r *TasksReconciler) Run(driver bindings.SchedulerDriver) { +func (r *tasksReconciler) Run(driver bindings.SchedulerDriver, done <-chan struct{}) { var cancel, finished chan struct{} requestLoop: for { select { - case <-r.done: + case <-done: return default: // proceed } @@ -97,7 +101,7 @@ requestLoop: case <-r.implicit: metrics.ReconciliationRequested.WithLabelValues("implicit").Inc() select { - case <-r.done: + case <-done: return case <-r.explicit: break // give preference to a pending request for explicit @@ -111,7 +115,7 @@ requestLoop: continue requestLoop } } - errOnce := proc.NewErrorOnce(r.done) + errOnce := proc.NewErrorOnce(done) errCh := r.Do(func() { var err error defer errOnce.Report(err) @@ -123,10 +127,10 @@ requestLoop: }) proc.OnError(errOnce.Send(errCh).Err(), func(err error) { log.Errorf("failed to run implicit reconciliation: %v", err) - }, r.done) + }, done) goto slowdown } - case <-r.done: + case <-done: return case <-r.explicit: // continue metrics.ReconciliationRequested.WithLabelValues("explicit").Inc() @@ -139,7 +143,7 @@ requestLoop: // play nice and wait for the prior operation to finish, complain // if it doesn't select { - case <-r.done: + case <-done: return case <-finished: // noop, expected case <-time.After(r.explicitReconciliationAbortTimeout): // very unexpected @@ -170,7 +174,7 @@ requestLoop: slowdown: // don't allow reconciliation to run very frequently, either explicit or implicit select { - case <-r.done: + case <-done: return case <-time.After(r.cooldown): // noop } diff --git a/contrib/mesos/pkg/scheduler/framework.go b/contrib/mesos/pkg/scheduler/framework.go index 499f0ad0a77..427c7b61838 100644 --- a/contrib/mesos/pkg/scheduler/framework.go +++ b/contrib/mesos/pkg/scheduler/framework.go @@ -53,9 +53,20 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/sets" + mscheduler "github.com/mesos/mesos-go/scheduler" ) -type Framework struct { +type Framework interface { + mscheduler.Scheduler + + Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error + Registration() <-chan struct{} + Offers() offers.Registry + LaunchTask(t *podtask.T) error + KillTask(id string) error +} + +type framework struct { // We use a lock here to avoid races // between invoking the mesos callback *sync.RWMutex @@ -71,7 +82,7 @@ type Framework struct { nodeRegistrator node.Registrator storeFrameworkId func(id string) - // Mesos context. + // Mesos context driver bindings.SchedulerDriver // late initialization frameworkId *mesos.FrameworkID masterInfo *mesos.MasterInfo @@ -82,7 +93,8 @@ type Framework struct { slaveHostNames *slave.Registry // via deferred init - tasksReconciler *taskreconciler.TasksReconciler + tasksReconciler taskreconciler.TasksReconciler + mux *http.ServeMux reconcileCooldown time.Duration asRegisteredMaster proc.Doer terminate <-chan struct{} // signal chan, closes when we should kill background tasks @@ -100,9 +112,9 @@ type Config struct { } // New creates a new Framework -func New(config Config) *Framework { - var k *Framework - k = &Framework{ +func New(config Config) Framework { + var k *framework + k = &framework{ schedulerConfig: &config.SchedulerConfig, RWMutex: new(sync.RWMutex), executor: config.Executor, @@ -156,10 +168,11 @@ func New(config Config) *Framework { return k } -func (k *Framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error { +func (k *framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error { log.V(1).Infoln("initializing kubernetes mesos scheduler") k.sched = scheduler + k.mux = mux k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { if !k.registered { return proc.ErrorChanf("failed to execute action, scheduler is disconnected") @@ -168,18 +181,17 @@ func (k *Framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux * }) k.terminate = electedMaster.Done() k.offers.Init(k.terminate) - k.InstallDebugHandlers(mux) k.nodeRegistrator.Run(k.terminate) return k.recoverTasks() } -func (k *Framework) asMaster() proc.Doer { +func (k *framework) asMaster() proc.Doer { k.RLock() defer k.RUnlock() return k.asRegisteredMaster } -func (k *Framework) InstallDebugHandlers(mux *http.ServeMux) { +func (k *framework) installDebugHandlers(mux *http.ServeMux) { wrappedHandler := func(uri string, h http.Handler) { mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { ch := make(chan struct{}) @@ -227,12 +239,12 @@ func (k *Framework) InstallDebugHandlers(mux *http.ServeMux) { })) } -func (k *Framework) Registration() <-chan struct{} { +func (k *framework) Registration() <-chan struct{} { return k.registration } // Registered is called when the scheduler registered with the master successfully. -func (k *Framework) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { +func (k *framework) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) k.driver = drv @@ -246,7 +258,7 @@ func (k *Framework) Registered(drv bindings.SchedulerDriver, fid *mesos.Framewor // Reregistered is called when the scheduler re-registered with the master successfully. // This happends when the master fails over. -func (k *Framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { +func (k *framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { log.Infof("Scheduler reregistered with the master: %v\n", mi) k.driver = drv @@ -258,7 +270,7 @@ func (k *Framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterI } // perform one-time initialization actions upon the first registration event received from Mesos. -func (k *Framework) onInitialRegistration(driver bindings.SchedulerDriver) { +func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) { defer close(k.registration) if k.failoverTimeout > 0 { @@ -276,17 +288,19 @@ func (k *Framework) onInitialRegistration(driver bindings.SchedulerDriver) { k.tasksReconciler = taskreconciler.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) - go k.tasksReconciler.Run(driver) + go k.tasksReconciler.Run(driver, k.terminate) if k.reconcileInterval > 0 { ri := time.Duration(k.reconcileInterval) * time.Second time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) }) log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration) } + + k.installDebugHandlers(k.mux) } // Disconnected is called when the scheduler loses connection to the master. -func (k *Framework) Disconnected(driver bindings.SchedulerDriver) { +func (k *framework) Disconnected(driver bindings.SchedulerDriver) { log.Infof("Master disconnected!\n") k.registered = false @@ -296,7 +310,7 @@ func (k *Framework) Disconnected(driver bindings.SchedulerDriver) { } // ResourceOffers is called when the scheduler receives some offers from the master. -func (k *Framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { +func (k *framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { log.V(2).Infof("Received offers %+v", offers) // Record the offers in the global offer map as well as each slave's offer map. @@ -317,7 +331,7 @@ func (k *Framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*me } // OfferRescinded is called when the resources are recinded from the scheduler. -func (k *Framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { +func (k *framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { log.Infof("Offer rescinded %v\n", offerId) oid := offerId.GetValue() @@ -325,7 +339,7 @@ func (k *Framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mes } // StatusUpdate is called when a status update message is sent to the scheduler. -func (k *Framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { source, reason := "none", "none" if taskStatus.Source != nil { @@ -401,7 +415,7 @@ func (k *Framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *me } } -func (k *Framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { task, state := k.sched.Tasks().UpdateStatus(taskStatus) if (state == podtask.StateRunning || state == podtask.StatePending) && @@ -441,7 +455,7 @@ func (k *Framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskS } // reconcile an unknown (from the perspective of our registry) non-terminal task -func (k *Framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { +func (k *framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { // attempt to recover task from pod info: // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace @@ -513,13 +527,13 @@ func (k *Framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, ta } // FrameworkMessage is called when the scheduler receives a message from the executor. -func (k *Framework) FrameworkMessage(driver bindings.SchedulerDriver, +func (k *framework) FrameworkMessage(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) } // SlaveLost is called when some slave is lost. -func (k *Framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { +func (k *framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { log.Infof("Slave %v is lost\n", slaveId) sid := slaveId.GetValue() @@ -534,14 +548,14 @@ func (k *Framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.Sl } // ExecutorLost is called when some executor is lost. -func (k *Framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { +func (k *framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) // TODO(yifan): Restart any unfinished tasks of the executor. } // Error is called when there is an unrecoverable error in the scheduler or scheduler driver. // The driver should have been aborted before this is invoked. -func (k *Framework) Error(driver bindings.SchedulerDriver, message string) { +func (k *framework) Error(driver bindings.SchedulerDriver, message string) { log.Fatalf("fatal scheduler error: %v\n", message) } @@ -561,7 +575,7 @@ func explicitTaskFilter(t *podtask.T) bool { // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // is cancelled. if any other errors occur the composite reconciler will attempt to complete the // sequence, reporting only the last generated error. -func (k *Framework) makeCompositeReconciler(actions ...taskreconciler.ReconcilerAction) taskreconciler.ReconcilerAction { +func (k *framework) makeCompositeReconciler(actions ...taskreconciler.ReconcilerAction) taskreconciler.ReconcilerAction { if x := len(actions); x == 0 { // programming error panic("no actions specified for composite reconciler") @@ -612,7 +626,7 @@ func (k *Framework) makeCompositeReconciler(actions ...taskreconciler.Reconciler // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks listed in the scheduler's internal taskRegistry. -func (k *Framework) makeTaskRegistryReconciler() taskreconciler.ReconcilerAction { +func (k *framework) makeTaskRegistryReconciler() taskreconciler.ReconcilerAction { return taskreconciler.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { taskToSlave := make(map[string]string) for _, t := range k.sched.Tasks().List(explicitTaskFilter) { @@ -626,7 +640,7 @@ func (k *Framework) makeTaskRegistryReconciler() taskreconciler.ReconcilerAction // reconciler action factory, performs explicit task reconciliation for non-terminal // tasks identified by annotations in the Kubernetes pod registry. -func (k *Framework) makePodRegistryReconciler() taskreconciler.ReconcilerAction { +func (k *framework) makePodRegistryReconciler() taskreconciler.ReconcilerAction { return taskreconciler.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { @@ -652,7 +666,7 @@ func (k *Framework) makePodRegistryReconciler() taskreconciler.ReconcilerAction } // execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ -func (k *Framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { +func (k *framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { log.Info("explicit reconcile tasks") // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about @@ -703,7 +717,7 @@ func (k *Framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, ta return nil } -func (ks *Framework) recoverTasks() error { +func (ks *framework) recoverTasks() error { podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) if err != nil { log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) @@ -737,13 +751,13 @@ func (ks *Framework) recoverTasks() error { return nil } -func (ks *Framework) KillTask(id string) error { +func (ks *framework) KillTask(id string) error { killTaskId := mutil.NewTaskID(id) _, err := ks.driver.KillTask(killTaskId) return err } -func (ks *Framework) LaunchTask(t *podtask.T) error { +func (ks *framework) LaunchTask(t *podtask.T) error { // assume caller is holding scheduler lock taskList := []*mesos.TaskInfo{t.BuildTaskInfo(ks.executor)} offerIds := []*mesos.OfferID{t.Offer.Details().Id} @@ -752,6 +766,6 @@ func (ks *Framework) LaunchTask(t *podtask.T) error { return err } -func (ks *Framework) Offers() offers.Registry { +func (ks *framework) Offers() offers.Registry { return ks.offers } diff --git a/contrib/mesos/pkg/scheduler/framework_test.go b/contrib/mesos/pkg/scheduler/framework_test.go index ac298563b4d..91b3c98f277 100644 --- a/contrib/mesos/pkg/scheduler/framework_test.go +++ b/contrib/mesos/pkg/scheduler/framework_test.go @@ -95,7 +95,7 @@ func TestResourceOffer_Add(t *testing.T) { assert := assert.New(t) registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} - testFramework := &Framework{ + testFramework := &framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -141,7 +141,7 @@ func TestResourceOffer_Add(t *testing.T) { func TestResourceOffer_Add_Rescind(t *testing.T) { assert := assert.New(t) - testFramework := &Framework{ + testFramework := &framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -198,7 +198,7 @@ func TestSlave_Lost(t *testing.T) { assert := assert.New(t) // - testFramework := &Framework{ + testFramework := &framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -256,7 +256,7 @@ func TestDisconnect(t *testing.T) { assert := assert.New(t) // - testFramework := &Framework{ + testFramework := &framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true @@ -300,7 +300,7 @@ func TestStatus_Update(t *testing.T) { // setup expectations mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil) - testFramework := &Framework{ + testFramework := &framework{ offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { return true diff --git a/contrib/mesos/pkg/scheduler/integration_test.go b/contrib/mesos/pkg/scheduler/integration_test.go index 0c32d688271..70f97c4312a 100644 --- a/contrib/mesos/pkg/scheduler/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration_test.go @@ -427,7 +427,7 @@ type lifecycleTest struct { driver *mmock.JoinableDriver eventObs *EventObserver podsListWatch *MockPodsListWatch - framework *Framework + framework Framework schedulerProc *ha.SchedulerProcess scheduler *Scheduler t *testing.T diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 05bbf26d7fc..4f8b82a1a13 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -42,16 +42,16 @@ import ( // Scheduler implements types.Scheduler type Scheduler struct { - podReconciler *podreconciler.PodReconciler - framework *Framework - loop *schedulerloop.SchedulerLoop + podReconciler podreconciler.PodReconciler + framework Framework + loop schedulerloop.SchedulerLoop // unsafe state, needs to be guarded, especially changes to podtask.T objects sync.RWMutex taskRegistry podtask.Registry } -func NewScheduler(c *config.Config, framework *Framework, podScheduler podschedulers.PodScheduler, +func NewScheduler(c *config.Config, framework Framework, podScheduler podschedulers.PodScheduler, client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *Scheduler { core := &Scheduler{ @@ -107,7 +107,7 @@ func (c *Scheduler) Tasks() podtask.Registry { } func (c *Scheduler) Offers() offers.Registry { - return c.framework.offers + return c.framework.Offers() } func (c *Scheduler) KillTask(id string) error {