Introduce interfaces for all scheduler components

This commit is contained in:
Dr. Stefan Schimanski 2015-11-03 07:52:42 +01:00
parent 29e58bab68
commit be57bd63e5
11 changed files with 133 additions and 94 deletions

View File

@ -30,15 +30,19 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
) )
type SchedulerAlgorithm interface {
Schedule(pod *api.Pod) (string, error)
}
// SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface // SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type SchedulerAlgorithm struct { type schedulerAlgorithm struct {
sched types.Scheduler sched types.Scheduler
podUpdates queue.FIFO podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler podScheduler podschedulers.PodScheduler
} }
func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) *SchedulerAlgorithm { func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) SchedulerAlgorithm {
return &SchedulerAlgorithm{ return &schedulerAlgorithm{
sched: sched, sched: sched,
podUpdates: podUpdates, podUpdates: podUpdates,
podScheduler: podScheduler, podScheduler: podScheduler,
@ -47,7 +51,7 @@ func NewSchedulerAlgorithm(sched types.Scheduler, podUpdates queue.FIFO, podSche
// Schedule implements the Scheduler interface of Kubernetes. // Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's name and error (if there's any). // It returns the selectedMachine's name and error (if there's any).
func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) { func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
log.Infof("Try to schedule pod %v\n", pod.Name) log.Infof("Try to schedule pod %v\n", pod.Name)
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
@ -105,7 +109,7 @@ func (k *SchedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
} }
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on // Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
func (k *SchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) { func (k *schedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) {
var offer offers.Perishable var offer offers.Perishable
if task.HasAcceptedOffer() { if task.HasAcceptedOffer() {
// verify that the offer is still on the table // verify that the offer is still on the table

View File

@ -28,18 +28,22 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
type Binder struct { type Binder interface {
Bind(binding *api.Binding) error
}
type binder struct {
sched types.Scheduler sched types.Scheduler
} }
func NewBinder(sched types.Scheduler) *Binder { func NewBinder(sched types.Scheduler) Binder {
return &Binder{ return &binder{
sched: sched, sched: sched,
} }
} }
// implements binding.Registry, launches the pod-associated-task in mesos // implements binding.Registry, launches the pod-associated-task in mesos
func (b *Binder) Bind(binding *api.Binding) error { func (b *binder) Bind(binding *api.Binding) error {
ctx := api.WithNamespace(api.NewContext(), binding.Namespace) ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
@ -63,7 +67,7 @@ func (b *Binder) Bind(binding *api.Binding) error {
} }
} }
func (b *Binder) rollback(task *podtask.T, err error) error { func (b *binder) rollback(task *podtask.T, err error) error {
task.Offer.Release() task.Offer.Release()
task.Reset() task.Reset()
if err2 := b.sched.Tasks().Update(task); err2 != nil { if err2 := b.sched.Tasks().Update(task); err2 != nil {
@ -78,7 +82,7 @@ func (b *Binder) rollback(task *podtask.T, err error) error {
// kubernetes executor on the slave will finally do the binding. This is different from the // kubernetes executor on the slave will finally do the binding. This is different from the
// upstream scheduler in the sense that the upstream scheduler does the binding and the // upstream scheduler in the sense that the upstream scheduler does the binding and the
// kubelet will notice that and launches the pod. // kubelet will notice that and launches the pod.
func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) {
// sanity check: ensure that the task hasAcceptedOffer(), it's possible that between // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between
// Schedule() and now that the offer for this task was rescinded or invalidated. // Schedule() and now that the offer for this task was rescinded or invalidated.
// ((we should never see this here)) // ((we should never see this here))
@ -111,7 +115,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
} }
//TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified //TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified
func (b *Binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error {
pod := task.Pod pod := task.Pod
// we make an effort here to avoid making changes to the task's copy of the pod, since // we make an effort here to avoid making changes to the task's copy of the pod, since

View File

@ -29,13 +29,18 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
type Deleter struct { type Deleter interface {
Run(updates <-chan queue.Entry, done <-chan struct{})
DeleteOne(pod *queuer.Pod) error
}
type deleter struct {
sched types.Scheduler sched types.Scheduler
qr *queuer.Queuer qr *queuer.Queuer
} }
func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) *Deleter { func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) Deleter {
return &Deleter{ return &deleter{
sched: sched, sched: sched,
qr: qr, qr: qr,
} }
@ -43,7 +48,7 @@ func NewDeleter(sched types.Scheduler, qr *queuer.Queuer) *Deleter {
// currently monitors for "pod deleted" events, upon which handle() // currently monitors for "pod deleted" events, upon which handle()
// is invoked. // is invoked.
func (k *Deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
go runtime.Until(func() { go runtime.Until(func() {
for { for {
entry := <-updates entry := <-updates
@ -59,7 +64,7 @@ func (k *Deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
}, 1*time.Second, done) }, 1*time.Second, done)
} }
func (k *Deleter) DeleteOne(pod *queuer.Pod) error { func (k *deleter) DeleteOne(pod *queuer.Pod) error {
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
podKey, err := podtask.MakePodKey(ctx, pod.Name) podKey, err := podtask.MakePodKey(ctx, pod.Name)
if err != nil { if err != nil {

View File

@ -30,15 +30,19 @@ import (
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
) )
type ErrorHandler struct { type ErrorHandler interface {
Error(pod *api.Pod, schedulingErr error)
}
type errorHandler struct {
sched types.Scheduler sched types.Scheduler
backoff *backoff.Backoff backoff *backoff.Backoff
qr *queuer.Queuer qr *queuer.Queuer
podScheduler podschedulers.PodScheduler podScheduler podschedulers.PodScheduler
} }
func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) *ErrorHandler { func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) ErrorHandler {
return &ErrorHandler{ return &errorHandler{
sched: sched, sched: sched,
backoff: backoff, backoff: backoff,
qr: qr, qr: qr,
@ -47,7 +51,7 @@ func NewErrorHandler(sched types.Scheduler, backoff *backoff.Backoff, qr *queuer
} }
// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler // implementation of scheduling plugin's Error func; see plugin/pkg/scheduler
func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) { func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) {
if schedulingErr == merrors.NoSuchPodErr { if schedulingErr == merrors.NoSuchPodErr {
log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name) log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name)

View File

@ -31,15 +31,19 @@ import (
) )
// PodReconciler reconciles a pod with the apiserver // PodReconciler reconciles a pod with the apiserver
type PodReconciler struct { type PodReconciler interface {
Reconcile(t *podtask.T)
}
type podReconciler struct {
sched types.Scheduler sched types.Scheduler
client *client.Client client *client.Client
qr *queuer.Queuer qr *queuer.Queuer
deleter *deleter.Deleter deleter deleter.Deleter
} }
func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter *deleter.Deleter) *PodReconciler { func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Queuer, deleter deleter.Deleter) PodReconciler {
return &PodReconciler{ return &podReconciler{
sched: sched, sched: sched,
client: client, client: client,
qr: qr, qr: qr,
@ -58,7 +62,7 @@ func NewPodReconciler(sched types.Scheduler, client *client.Client, qr *queuer.Q
// host="..." | host="..." ; perhaps no updates to process? // host="..." | host="..." ; perhaps no updates to process?
// //
// TODO(jdef) this needs an integration test // TODO(jdef) this needs an integration test
func (s *PodReconciler) Reconcile(t *podtask.T) { func (s *podReconciler) Reconcile(t *podtask.T) {
log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave) log.V(1).Infof("reconcile pod %v, assigned to slave %q", t.Pod.Name, t.Spec.AssignedSlave)
ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace)
pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name) pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name)

View File

@ -35,13 +35,13 @@ const (
Scheduled = "Scheduled" Scheduled = "Scheduled"
) )
type SchedulerLoopInterface interface { type SchedulerLoop interface {
Run(<-chan struct{}) Run(<-chan struct{})
} }
type SchedulerLoop struct { type schedulerLoop struct {
algorithm *algorithm.SchedulerAlgorithm algorithm algorithm.SchedulerAlgorithm
binder *binder.Binder binder binder.Binder
nextPod func() *api.Pod nextPod func() *api.Pod
error func(*api.Pod, error) error func(*api.Pod, error)
recorder record.EventRecorder recorder record.EventRecorder
@ -49,10 +49,10 @@ type SchedulerLoop struct {
started chan<- struct{} // startup latch started chan<- struct{} // startup latch
} }
func NewSchedulerLoop(client *client.Client, algorithm *algorithm.SchedulerAlgorithm, func NewSchedulerLoop(client *client.Client, algorithm algorithm.SchedulerAlgorithm,
recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error), recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error),
binder *binder.Binder, started chan<- struct{}) *SchedulerLoop { binder binder.Binder, started chan<- struct{}) SchedulerLoop {
return &SchedulerLoop{ return &schedulerLoop{
algorithm: algorithm, algorithm: algorithm,
binder: binder, binder: binder,
nextPod: nextPod, nextPod: nextPod,
@ -63,14 +63,14 @@ func NewSchedulerLoop(client *client.Client, algorithm *algorithm.SchedulerAlgor
} }
} }
func (s *SchedulerLoop) Run(done <-chan struct{}) { func (s *schedulerLoop) Run(done <-chan struct{}) {
defer close(s.started) defer close(s.started)
go runtime.Until(s.scheduleOne, recoveryDelay, done) go runtime.Until(s.scheduleOne, recoveryDelay, done)
} }
// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go, // hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go,
// with the Modeler stuff removed since we don't use it because we have mesos. // with the Modeler stuff removed since we don't use it because we have mesos.
func (s *SchedulerLoop) scheduleOne() { func (s *schedulerLoop) scheduleOne() {
pod := s.nextPod() pod := s.nextPod()
// pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet // pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet

View File

@ -29,25 +29,29 @@ import (
type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error type ReconcilerAction func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error
type TasksReconciler struct { type TasksReconciler interface {
RequestExplicit()
RequestImplicit()
Run(driver bindings.SchedulerDriver, done <-chan struct{})
}
type tasksReconciler struct {
proc.Doer proc.Doer
Action ReconcilerAction Action ReconcilerAction
explicit chan struct{} // send an empty struct to trigger explicit reconciliation explicit chan struct{} // send an empty struct to trigger explicit reconciliation
implicit chan struct{} // send an empty struct to trigger implicit reconciliation implicit chan struct{} // send an empty struct to trigger implicit reconciliation
done <-chan struct{} // close this when you want the reconciler to exit
cooldown time.Duration cooldown time.Duration
explicitReconciliationAbortTimeout time.Duration explicitReconciliationAbortTimeout time.Duration
} }
func NewTasksReconciler(doer proc.Doer, action ReconcilerAction, func NewTasksReconciler(doer proc.Doer, action ReconcilerAction,
cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) *TasksReconciler { cooldown, explicitReconciliationAbortTimeout time.Duration, done <-chan struct{}) TasksReconciler {
return &TasksReconciler{ return &tasksReconciler{
Doer: doer, Doer: doer,
explicit: make(chan struct{}, 1), explicit: make(chan struct{}, 1),
implicit: make(chan struct{}, 1), implicit: make(chan struct{}, 1),
cooldown: cooldown, cooldown: cooldown,
explicitReconciliationAbortTimeout: explicitReconciliationAbortTimeout, explicitReconciliationAbortTimeout: explicitReconciliationAbortTimeout,
done: done,
Action: func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { Action: func(driver bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
// trigged the reconciler action in the doer's execution context, // trigged the reconciler action in the doer's execution context,
// but it could take a while and the scheduler needs to be able to // but it could take a while and the scheduler needs to be able to
@ -67,14 +71,14 @@ func NewTasksReconciler(doer proc.Doer, action ReconcilerAction,
} }
} }
func (r *TasksReconciler) RequestExplicit() { func (r *tasksReconciler) RequestExplicit() {
select { select {
case r.explicit <- struct{}{}: // noop case r.explicit <- struct{}{}: // noop
default: // request queue full; noop default: // request queue full; noop
} }
} }
func (r *TasksReconciler) RequestImplicit() { func (r *tasksReconciler) RequestImplicit() {
select { select {
case r.implicit <- struct{}{}: // noop case r.implicit <- struct{}{}: // noop
default: // request queue full; noop default: // request queue full; noop
@ -84,12 +88,12 @@ func (r *TasksReconciler) RequestImplicit() {
// execute task reconciliation, returns when r.done is closed. intended to run as a goroutine. // execute task reconciliation, returns when r.done is closed. intended to run as a goroutine.
// if reconciliation is requested while another is in progress, the in-progress operation will be // if reconciliation is requested while another is in progress, the in-progress operation will be
// cancelled before the new reconciliation operation begins. // cancelled before the new reconciliation operation begins.
func (r *TasksReconciler) Run(driver bindings.SchedulerDriver) { func (r *tasksReconciler) Run(driver bindings.SchedulerDriver, done <-chan struct{}) {
var cancel, finished chan struct{} var cancel, finished chan struct{}
requestLoop: requestLoop:
for { for {
select { select {
case <-r.done: case <-done:
return return
default: // proceed default: // proceed
} }
@ -97,7 +101,7 @@ requestLoop:
case <-r.implicit: case <-r.implicit:
metrics.ReconciliationRequested.WithLabelValues("implicit").Inc() metrics.ReconciliationRequested.WithLabelValues("implicit").Inc()
select { select {
case <-r.done: case <-done:
return return
case <-r.explicit: case <-r.explicit:
break // give preference to a pending request for explicit break // give preference to a pending request for explicit
@ -111,7 +115,7 @@ requestLoop:
continue requestLoop continue requestLoop
} }
} }
errOnce := proc.NewErrorOnce(r.done) errOnce := proc.NewErrorOnce(done)
errCh := r.Do(func() { errCh := r.Do(func() {
var err error var err error
defer errOnce.Report(err) defer errOnce.Report(err)
@ -123,10 +127,10 @@ requestLoop:
}) })
proc.OnError(errOnce.Send(errCh).Err(), func(err error) { proc.OnError(errOnce.Send(errCh).Err(), func(err error) {
log.Errorf("failed to run implicit reconciliation: %v", err) log.Errorf("failed to run implicit reconciliation: %v", err)
}, r.done) }, done)
goto slowdown goto slowdown
} }
case <-r.done: case <-done:
return return
case <-r.explicit: // continue case <-r.explicit: // continue
metrics.ReconciliationRequested.WithLabelValues("explicit").Inc() metrics.ReconciliationRequested.WithLabelValues("explicit").Inc()
@ -139,7 +143,7 @@ requestLoop:
// play nice and wait for the prior operation to finish, complain // play nice and wait for the prior operation to finish, complain
// if it doesn't // if it doesn't
select { select {
case <-r.done: case <-done:
return return
case <-finished: // noop, expected case <-finished: // noop, expected
case <-time.After(r.explicitReconciliationAbortTimeout): // very unexpected case <-time.After(r.explicitReconciliationAbortTimeout): // very unexpected
@ -170,7 +174,7 @@ requestLoop:
slowdown: slowdown:
// don't allow reconciliation to run very frequently, either explicit or implicit // don't allow reconciliation to run very frequently, either explicit or implicit
select { select {
case <-r.done: case <-done:
return return
case <-time.After(r.cooldown): // noop case <-time.After(r.cooldown): // noop
} }

View File

@ -53,9 +53,20 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
mscheduler "github.com/mesos/mesos-go/scheduler"
) )
type Framework struct { type Framework interface {
mscheduler.Scheduler
Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error
Registration() <-chan struct{}
Offers() offers.Registry
LaunchTask(t *podtask.T) error
KillTask(id string) error
}
type framework struct {
// We use a lock here to avoid races // We use a lock here to avoid races
// between invoking the mesos callback // between invoking the mesos callback
*sync.RWMutex *sync.RWMutex
@ -71,7 +82,7 @@ type Framework struct {
nodeRegistrator node.Registrator nodeRegistrator node.Registrator
storeFrameworkId func(id string) storeFrameworkId func(id string)
// Mesos context. // Mesos context
driver bindings.SchedulerDriver // late initialization driver bindings.SchedulerDriver // late initialization
frameworkId *mesos.FrameworkID frameworkId *mesos.FrameworkID
masterInfo *mesos.MasterInfo masterInfo *mesos.MasterInfo
@ -82,7 +93,8 @@ type Framework struct {
slaveHostNames *slave.Registry slaveHostNames *slave.Registry
// via deferred init // via deferred init
tasksReconciler *taskreconciler.TasksReconciler tasksReconciler taskreconciler.TasksReconciler
mux *http.ServeMux
reconcileCooldown time.Duration reconcileCooldown time.Duration
asRegisteredMaster proc.Doer asRegisteredMaster proc.Doer
terminate <-chan struct{} // signal chan, closes when we should kill background tasks terminate <-chan struct{} // signal chan, closes when we should kill background tasks
@ -100,9 +112,9 @@ type Config struct {
} }
// New creates a new Framework // New creates a new Framework
func New(config Config) *Framework { func New(config Config) Framework {
var k *Framework var k *framework
k = &Framework{ k = &framework{
schedulerConfig: &config.SchedulerConfig, schedulerConfig: &config.SchedulerConfig,
RWMutex: new(sync.RWMutex), RWMutex: new(sync.RWMutex),
executor: config.Executor, executor: config.Executor,
@ -156,10 +168,11 @@ func New(config Config) *Framework {
return k return k
} }
func (k *Framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error { func (k *framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *http.ServeMux) error {
log.V(1).Infoln("initializing kubernetes mesos scheduler") log.V(1).Infoln("initializing kubernetes mesos scheduler")
k.sched = scheduler k.sched = scheduler
k.mux = mux
k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error { k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error {
if !k.registered { if !k.registered {
return proc.ErrorChanf("failed to execute action, scheduler is disconnected") return proc.ErrorChanf("failed to execute action, scheduler is disconnected")
@ -168,18 +181,17 @@ func (k *Framework) Init(scheduler *Scheduler, electedMaster proc.Process, mux *
}) })
k.terminate = electedMaster.Done() k.terminate = electedMaster.Done()
k.offers.Init(k.terminate) k.offers.Init(k.terminate)
k.InstallDebugHandlers(mux)
k.nodeRegistrator.Run(k.terminate) k.nodeRegistrator.Run(k.terminate)
return k.recoverTasks() return k.recoverTasks()
} }
func (k *Framework) asMaster() proc.Doer { func (k *framework) asMaster() proc.Doer {
k.RLock() k.RLock()
defer k.RUnlock() defer k.RUnlock()
return k.asRegisteredMaster return k.asRegisteredMaster
} }
func (k *Framework) InstallDebugHandlers(mux *http.ServeMux) { func (k *framework) installDebugHandlers(mux *http.ServeMux) {
wrappedHandler := func(uri string, h http.Handler) { wrappedHandler := func(uri string, h http.Handler) {
mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
ch := make(chan struct{}) ch := make(chan struct{})
@ -227,12 +239,12 @@ func (k *Framework) InstallDebugHandlers(mux *http.ServeMux) {
})) }))
} }
func (k *Framework) Registration() <-chan struct{} { func (k *framework) Registration() <-chan struct{} {
return k.registration return k.registration
} }
// Registered is called when the scheduler registered with the master successfully. // Registered is called when the scheduler registered with the master successfully.
func (k *Framework) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) { func (k *framework) Registered(drv bindings.SchedulerDriver, fid *mesos.FrameworkID, mi *mesos.MasterInfo) {
log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid) log.Infof("Scheduler registered with the master: %v with frameworkId: %v\n", mi, fid)
k.driver = drv k.driver = drv
@ -246,7 +258,7 @@ func (k *Framework) Registered(drv bindings.SchedulerDriver, fid *mesos.Framewor
// Reregistered is called when the scheduler re-registered with the master successfully. // Reregistered is called when the scheduler re-registered with the master successfully.
// This happends when the master fails over. // This happends when the master fails over.
func (k *Framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) { func (k *framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterInfo) {
log.Infof("Scheduler reregistered with the master: %v\n", mi) log.Infof("Scheduler reregistered with the master: %v\n", mi)
k.driver = drv k.driver = drv
@ -258,7 +270,7 @@ func (k *Framework) Reregistered(drv bindings.SchedulerDriver, mi *mesos.MasterI
} }
// perform one-time initialization actions upon the first registration event received from Mesos. // perform one-time initialization actions upon the first registration event received from Mesos.
func (k *Framework) onInitialRegistration(driver bindings.SchedulerDriver) { func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) {
defer close(k.registration) defer close(k.registration)
if k.failoverTimeout > 0 { if k.failoverTimeout > 0 {
@ -276,17 +288,19 @@ func (k *Framework) onInitialRegistration(driver bindings.SchedulerDriver) {
k.tasksReconciler = taskreconciler.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), k.tasksReconciler = taskreconciler.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
go k.tasksReconciler.Run(driver) go k.tasksReconciler.Run(driver, k.terminate)
if k.reconcileInterval > 0 { if k.reconcileInterval > 0 {
ri := time.Duration(k.reconcileInterval) * time.Second ri := time.Duration(k.reconcileInterval) * time.Second
time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) }) time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) })
log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration) log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration)
} }
k.installDebugHandlers(k.mux)
} }
// Disconnected is called when the scheduler loses connection to the master. // Disconnected is called when the scheduler loses connection to the master.
func (k *Framework) Disconnected(driver bindings.SchedulerDriver) { func (k *framework) Disconnected(driver bindings.SchedulerDriver) {
log.Infof("Master disconnected!\n") log.Infof("Master disconnected!\n")
k.registered = false k.registered = false
@ -296,7 +310,7 @@ func (k *Framework) Disconnected(driver bindings.SchedulerDriver) {
} }
// ResourceOffers is called when the scheduler receives some offers from the master. // ResourceOffers is called when the scheduler receives some offers from the master.
func (k *Framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) { func (k *framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*mesos.Offer) {
log.V(2).Infof("Received offers %+v", offers) log.V(2).Infof("Received offers %+v", offers)
// Record the offers in the global offer map as well as each slave's offer map. // Record the offers in the global offer map as well as each slave's offer map.
@ -317,7 +331,7 @@ func (k *Framework) ResourceOffers(driver bindings.SchedulerDriver, offers []*me
} }
// OfferRescinded is called when the resources are recinded from the scheduler. // OfferRescinded is called when the resources are recinded from the scheduler.
func (k *Framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) { func (k *framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mesos.OfferID) {
log.Infof("Offer rescinded %v\n", offerId) log.Infof("Offer rescinded %v\n", offerId)
oid := offerId.GetValue() oid := offerId.GetValue()
@ -325,7 +339,7 @@ func (k *Framework) OfferRescinded(driver bindings.SchedulerDriver, offerId *mes
} }
// StatusUpdate is called when a status update message is sent to the scheduler. // StatusUpdate is called when a status update message is sent to the scheduler.
func (k *Framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { func (k *framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
source, reason := "none", "none" source, reason := "none", "none"
if taskStatus.Source != nil { if taskStatus.Source != nil {
@ -401,7 +415,7 @@ func (k *Framework) StatusUpdate(driver bindings.SchedulerDriver, taskStatus *me
} }
} }
func (k *Framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { func (k *framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
task, state := k.sched.Tasks().UpdateStatus(taskStatus) task, state := k.sched.Tasks().UpdateStatus(taskStatus)
if (state == podtask.StateRunning || state == podtask.StatePending) && if (state == podtask.StateRunning || state == podtask.StatePending) &&
@ -441,7 +455,7 @@ func (k *Framework) reconcileTerminalTask(driver bindings.SchedulerDriver, taskS
} }
// reconcile an unknown (from the perspective of our registry) non-terminal task // reconcile an unknown (from the perspective of our registry) non-terminal task
func (k *Framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) { func (k *framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, taskStatus *mesos.TaskStatus) {
// attempt to recover task from pod info: // attempt to recover task from pod info:
// - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil // - task data may contain an api.PodStatusResult; if status.reason == REASON_RECONCILIATION then status.data == nil
// - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace // - the Name can be parsed by container.ParseFullName() to yield a pod Name and Namespace
@ -513,13 +527,13 @@ func (k *Framework) reconcileNonTerminalTask(driver bindings.SchedulerDriver, ta
} }
// FrameworkMessage is called when the scheduler receives a message from the executor. // FrameworkMessage is called when the scheduler receives a message from the executor.
func (k *Framework) FrameworkMessage(driver bindings.SchedulerDriver, func (k *framework) FrameworkMessage(driver bindings.SchedulerDriver,
executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) { executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, message string) {
log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message) log.Infof("Received messages from executor %v of slave %v, %v\n", executorId, slaveId, message)
} }
// SlaveLost is called when some slave is lost. // SlaveLost is called when some slave is lost.
func (k *Framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) { func (k *framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.SlaveID) {
log.Infof("Slave %v is lost\n", slaveId) log.Infof("Slave %v is lost\n", slaveId)
sid := slaveId.GetValue() sid := slaveId.GetValue()
@ -534,14 +548,14 @@ func (k *Framework) SlaveLost(driver bindings.SchedulerDriver, slaveId *mesos.Sl
} }
// ExecutorLost is called when some executor is lost. // ExecutorLost is called when some executor is lost.
func (k *Framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) { func (k *framework) ExecutorLost(driver bindings.SchedulerDriver, executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, status int) {
log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status) log.Infof("Executor %v of slave %v is lost, status: %v\n", executorId, slaveId, status)
// TODO(yifan): Restart any unfinished tasks of the executor. // TODO(yifan): Restart any unfinished tasks of the executor.
} }
// Error is called when there is an unrecoverable error in the scheduler or scheduler driver. // Error is called when there is an unrecoverable error in the scheduler or scheduler driver.
// The driver should have been aborted before this is invoked. // The driver should have been aborted before this is invoked.
func (k *Framework) Error(driver bindings.SchedulerDriver, message string) { func (k *framework) Error(driver bindings.SchedulerDriver, message string) {
log.Fatalf("fatal scheduler error: %v\n", message) log.Fatalf("fatal scheduler error: %v\n", message)
} }
@ -561,7 +575,7 @@ func explicitTaskFilter(t *podtask.T) bool {
// invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation
// is cancelled. if any other errors occur the composite reconciler will attempt to complete the // is cancelled. if any other errors occur the composite reconciler will attempt to complete the
// sequence, reporting only the last generated error. // sequence, reporting only the last generated error.
func (k *Framework) makeCompositeReconciler(actions ...taskreconciler.ReconcilerAction) taskreconciler.ReconcilerAction { func (k *framework) makeCompositeReconciler(actions ...taskreconciler.ReconcilerAction) taskreconciler.ReconcilerAction {
if x := len(actions); x == 0 { if x := len(actions); x == 0 {
// programming error // programming error
panic("no actions specified for composite reconciler") panic("no actions specified for composite reconciler")
@ -612,7 +626,7 @@ func (k *Framework) makeCompositeReconciler(actions ...taskreconciler.Reconciler
// reconciler action factory, performs explicit task reconciliation for non-terminal // reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks listed in the scheduler's internal taskRegistry. // tasks listed in the scheduler's internal taskRegistry.
func (k *Framework) makeTaskRegistryReconciler() taskreconciler.ReconcilerAction { func (k *framework) makeTaskRegistryReconciler() taskreconciler.ReconcilerAction {
return taskreconciler.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { return taskreconciler.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
taskToSlave := make(map[string]string) taskToSlave := make(map[string]string)
for _, t := range k.sched.Tasks().List(explicitTaskFilter) { for _, t := range k.sched.Tasks().List(explicitTaskFilter) {
@ -626,7 +640,7 @@ func (k *Framework) makeTaskRegistryReconciler() taskreconciler.ReconcilerAction
// reconciler action factory, performs explicit task reconciliation for non-terminal // reconciler action factory, performs explicit task reconciliation for non-terminal
// tasks identified by annotations in the Kubernetes pod registry. // tasks identified by annotations in the Kubernetes pod registry.
func (k *Framework) makePodRegistryReconciler() taskreconciler.ReconcilerAction { func (k *framework) makePodRegistryReconciler() taskreconciler.ReconcilerAction {
return taskreconciler.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error { return taskreconciler.ReconcilerAction(func(drv bindings.SchedulerDriver, cancel <-chan struct{}) <-chan error {
podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) podList, err := k.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil { if err != nil {
@ -652,7 +666,7 @@ func (k *Framework) makePodRegistryReconciler() taskreconciler.ReconcilerAction
} }
// execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/ // execute an explicit task reconciliation, as per http://mesos.apache.org/documentation/latest/reconciliation/
func (k *Framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error { func (k *framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, taskToSlave map[string]string, cancel <-chan struct{}) error {
log.Info("explicit reconcile tasks") log.Info("explicit reconcile tasks")
// tell mesos to send us the latest status updates for all the non-terminal tasks that we know about // tell mesos to send us the latest status updates for all the non-terminal tasks that we know about
@ -703,7 +717,7 @@ func (k *Framework) explicitlyReconcileTasks(driver bindings.SchedulerDriver, ta
return nil return nil
} }
func (ks *Framework) recoverTasks() error { func (ks *framework) recoverTasks() error {
podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) podList, err := ks.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil { if err != nil {
log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err) log.V(1).Infof("failed to recover pod registry, madness may ensue: %v", err)
@ -737,13 +751,13 @@ func (ks *Framework) recoverTasks() error {
return nil return nil
} }
func (ks *Framework) KillTask(id string) error { func (ks *framework) KillTask(id string) error {
killTaskId := mutil.NewTaskID(id) killTaskId := mutil.NewTaskID(id)
_, err := ks.driver.KillTask(killTaskId) _, err := ks.driver.KillTask(killTaskId)
return err return err
} }
func (ks *Framework) LaunchTask(t *podtask.T) error { func (ks *framework) LaunchTask(t *podtask.T) error {
// assume caller is holding scheduler lock // assume caller is holding scheduler lock
taskList := []*mesos.TaskInfo{t.BuildTaskInfo(ks.executor)} taskList := []*mesos.TaskInfo{t.BuildTaskInfo(ks.executor)}
offerIds := []*mesos.OfferID{t.Offer.Details().Id} offerIds := []*mesos.OfferID{t.Offer.Details().Id}
@ -752,6 +766,6 @@ func (ks *Framework) LaunchTask(t *podtask.T) error {
return err return err
} }
func (ks *Framework) Offers() offers.Registry { func (ks *framework) Offers() offers.Registry {
return ks.offers return ks.offers
} }

View File

@ -95,7 +95,7 @@ func TestResourceOffer_Add(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)} registrator := &mockRegistrator{cache.NewStore(cache.MetaNamespaceKeyFunc)}
testFramework := &Framework{ testFramework := &framework{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -141,7 +141,7 @@ func TestResourceOffer_Add(t *testing.T) {
func TestResourceOffer_Add_Rescind(t *testing.T) { func TestResourceOffer_Add_Rescind(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
testFramework := &Framework{ testFramework := &framework{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -198,7 +198,7 @@ func TestSlave_Lost(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
// //
testFramework := &Framework{ testFramework := &framework{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -256,7 +256,7 @@ func TestDisconnect(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
// //
testFramework := &Framework{ testFramework := &framework{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true
@ -300,7 +300,7 @@ func TestStatus_Update(t *testing.T) {
// setup expectations // setup expectations
mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil) mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil)
testFramework := &Framework{ testFramework := &framework{
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
return true return true

View File

@ -427,7 +427,7 @@ type lifecycleTest struct {
driver *mmock.JoinableDriver driver *mmock.JoinableDriver
eventObs *EventObserver eventObs *EventObserver
podsListWatch *MockPodsListWatch podsListWatch *MockPodsListWatch
framework *Framework framework Framework
schedulerProc *ha.SchedulerProcess schedulerProc *ha.SchedulerProcess
scheduler *Scheduler scheduler *Scheduler
t *testing.T t *testing.T

View File

@ -42,16 +42,16 @@ import (
// Scheduler implements types.Scheduler // Scheduler implements types.Scheduler
type Scheduler struct { type Scheduler struct {
podReconciler *podreconciler.PodReconciler podReconciler podreconciler.PodReconciler
framework *Framework framework Framework
loop *schedulerloop.SchedulerLoop loop schedulerloop.SchedulerLoop
// unsafe state, needs to be guarded, especially changes to podtask.T objects // unsafe state, needs to be guarded, especially changes to podtask.T objects
sync.RWMutex sync.RWMutex
taskRegistry podtask.Registry taskRegistry podtask.Registry
} }
func NewScheduler(c *config.Config, framework *Framework, podScheduler podschedulers.PodScheduler, func NewScheduler(c *config.Config, framework Framework, podScheduler podschedulers.PodScheduler,
client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *Scheduler { client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *Scheduler {
core := &Scheduler{ core := &Scheduler{
@ -107,7 +107,7 @@ func (c *Scheduler) Tasks() podtask.Registry {
} }
func (c *Scheduler) Offers() offers.Registry { func (c *Scheduler) Offers() offers.Registry {
return c.framework.offers return c.framework.Offers()
} }
func (c *Scheduler) KillTask(id string) error { func (c *Scheduler) KillTask(id string) error {