Remove Offers() dependency from ErrorHandler by moving out BreakChan factory

This commit is contained in:
Dr. Stefan Schimanski 2015-11-04 12:17:49 +01:00
parent 4d99ee7e54
commit ddcdf6a798
4 changed files with 55 additions and 52 deletions

Binary file not shown.

View File

@ -18,7 +18,6 @@ package errorhandler
import ( import (
log "github.com/golang/glog" log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/backoff"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
@ -38,15 +37,15 @@ type errorHandler struct {
sched scheduler.Scheduler sched scheduler.Scheduler
backoff *backoff.Backoff backoff *backoff.Backoff
qr *queuer.Queuer qr *queuer.Queuer
podScheduler podschedulers.PodScheduler newBreakChan func(podKey string) queue.BreakChan
} }
func New(sched scheduler.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, podScheduler podschedulers.PodScheduler) ErrorHandler { func New(sched scheduler.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer, newBC func(podKey string) queue.BreakChan) ErrorHandler {
return &errorHandler{ return &errorHandler{
sched: sched, sched: sched,
backoff: backoff, backoff: backoff,
qr: qr, qr: qr,
podScheduler: podScheduler, newBreakChan: newBC,
} }
} }
@ -87,21 +86,7 @@ func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) {
breakoutEarly := queue.BreakChan(nil) breakoutEarly := queue.BreakChan(nil)
if schedulingErr == podschedulers.NoSuitableOffersErr { if schedulingErr == podschedulers.NoSuitableOffersErr {
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
breakoutEarly = queue.BreakChan(k.sched.Offers().Listen(podKey, func(offer *mesos.Offer) bool { breakoutEarly = k.newBreakChan(podKey)
k.sched.Lock()
defer k.sched.Unlock()
switch task, state := k.sched.Tasks().Get(task.ID); state {
case podtask.StatePending:
// Assess fitness of pod with the current offer. The scheduler normally
// "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up.
return !task.Has(podtask.Launched) && k.podScheduler.FitPredicate()(task, offer, nil)
default:
// no point in continuing to check for matching offers
return true
}
}))
} }
delay := k.backoff.Get(podKey) delay := k.backoff.Get(podKey)
log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay) log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay)

View File

@ -20,6 +20,7 @@ import (
"net/http" "net/http"
"sync" "sync"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/backoff"
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
@ -75,7 +76,24 @@ func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler
core.podReconciler = podreconciler.New(core, client, q, podDeleter) core.podReconciler = podreconciler.New(core, client, q, podDeleter)
bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration) bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration)
errorHandler := errorhandler.New(core, bo, q, ps) newBC := func(podKey string) queue.BreakChan {
return queue.BreakChan(core.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
core.Lock()
defer core.Unlock()
switch task, state := core.Tasks().ForPod(podKey); state {
case podtask.StatePending:
// Assess fitness of pod with the current offer. The scheduler normally
// "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up.
return !task.Has(podtask.Launched) && ps.FitPredicate()(task, offer, nil)
default:
// no point in continuing to check for matching offers
return true
}
}))
}
errorHandler := errorhandler.New(core, bo, q, newBC)
binder := binder.New(core) binder := binder.New(core)

View File

@ -28,38 +28,38 @@ package scheduler
// │ │ │ // │ │ │
// │ │ │ // │ │ │
// └───────────────┐┌───────────────────▲────────────────────▲─────────────────────┐ └───────────────────────┐ // └───────────────┐┌───────────────────▲────────────────────▲─────────────────────┐ └───────────────────────┐
// ││ │ │ // ││ │ │ ┌────────────────────┼─────────────────┐
// ┌───────────────────┼┼──────────────────────────────────────┐ │ ┌───────────────────┼────────────────┐ // ┌───────────────────┼┼──────────────────────────────────────┐ │ ┌───────────────────┼────┼───────────┐ │
// ┌───────────▼──────────┐┌───────┴┴───────┐ ┌───────────────────┐ ┌──┴─┴─┴──────┐ ┌────────┴───────┐ ┌────▼────────▼─────────────┐ // ┌───────────▼──────────┐┌───────┴┴───────┐ ┌───────────────────┐ ┌──┴─┴─┴──────┐ ┌────────┴───────┐ ┌────▼────────▼─────────────┐
// │Binder (task launcher)││Deleter │ │PodReconciler │ │SchedulerLoop│ │ ErrorHandler │ │SchedulerAlgorithm │ // │Binder (task launcher)││Deleter │ │PodReconciler │ │SchedulerLoop│ │ ErrorHandler │ │SchedulerAlgorithm │
// │- Bind(binding) ││- DeleteOne(pod)│ │- Reconcile(pod) │ │- Run() │ │- Error(pod, err)│ │- Schedule(pod) -> NodeName│ // │- Bind(binding) ││- DeleteOne(pod)│ │- Reconcile(pod) │ │- Run() │ │- Error(pod, err)│ │- Schedule(pod) -> NodeName│
// │ ││ │◀──│ │ │ │──▶│ │ │ │ // │ ││ │◀──│ │ │ │──▶│ │ │ │
// │ ┌─────┐││ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │┌─────┐ │ // │ ┌─────┐││ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │┌─────┐ │
// └───────────────┤sched├┘└────┤sched├─────┘ └──────┤sched├───▲──┘ └───┤sched├───┘ └────┤sched├──────┘ └┤sched├────────────────────┘ // └───────────────┤sched├┘└────┤sched├─────┘ └──────┤sched├───▲──┘ └───┤sched├───┘ └────┤sched├──────┘ └┤sched├──────────────┬─────┘ │
// ├-│││-┴──────┴--││-┴────────────────┴--│--┴───┼──────────┴--│--┴────────────┴-│││-┴──────────┴-│││-┴─────┐ ┌──────────────────────┐ // ├-│││-┴──────┴--││-┴────────────────┴--│--┴───┼──────────┴--│--┴────────────┴-│---┴──────────┴-│││-┤ ┌────────────▼─────────▼─────────┐
// │ │││ ││ │ │ │ │││ │││ │ │ // │ │││ ││ │ │ │ │ │││ │ │ podScheduler
// │ ││└───────────▼┼─────────────────────▼──────┼─────────────▼─────────────────▼┼┼──────────────┘││ │ │ A ──────────▶ B // │ ││└───────────▼┼─────────────────────▼──────┼─────────────▼─────────────────▼────────────────┘││ │ │ (e.g. fcfsPodScheduler)
// │ │└─────────────┼────────────────────────────┼─────────────┼──────────────────▼───────────────┘│ // │ │└─────────────┼────────────────────────────┼─────────────┼──────────────────▼───────────────┘│ │ │
// │ │ │ │ │ │└──────────┐ │ │ │ A has a reference // │ │ │ │ │ │ │ │ │ scheduleOne(pod, offers ...)
// │ │ │ │ │ │ │ │ │ │ on B and calls B │ // │ │ │ │ │ │ │ │ │ ┌──────────────────────────┤
// │ │ │ ╲ │ │ │ │ │ │ ┌─▼─────▼──────┐│ │ // │ │ │ ╲ │ │ │ │ │ │ ▼ │ │ │ allocationStrategy
// │ │ │ ╲ └┐ │ ┌┘ │ │ │ │PodScheduler()││ └──────────────────────┘ // │ │ │ ╲ └┐ │ ┌┘ │ │ │ │ │ │ - FitPredicate │
// │ │ │ ╲ │ │ │ │ │ │ └─┼─────┬──────┘ // │ │ │ ╲ │ │ │ │ │ │ │ │ │ - Procurement
// │ │ │ ╲ └┐ │ ┌┘ │ │ │ ┌─┼─────▼───────┴────────────────┐ // │ │ │ ╲ └┐ │ ┌┘ │ │ │ │ └─────┴──────────────────────────┘
// │┌▼────────────┐┌▼──────────┐┌─▼─▼─▼─▼─▼─┐┌───┴────────┐┌───▼───┐ ┌────▼───┐ │ │ podScheduler // │┌▼────────────┐┌▼──────────┐┌─▼─▼─▼─▼─▼─┐┌───┴────────┐┌───▼───┐ ┌────▼───┐
// ││LaunchTask(t)││KillTask(t)││sync.Mutex ││reconcile(t)││Tasks()│ │Offers()│ │ │ (e.g. fcfsPodScheduler) // ││LaunchTask(t)││KillTask(t)││sync.Mutex ││reconcile(t)││Tasks()│ │Offers()│
// │└──────┬──────┘└─────┬─────┘└───────────┘└────────▲───┘└───┬───┘ └────┬───┘ │ │ // │└──────┬──────┘└─────┬─────┘└───────────┘└────────▲───┘└───┬───┘ └────┬───┘
// │ │ │ │ │ │ │ │scheduleOne(pod, offers ...) // │ │ │ │ │ │
// │ │ └──────────────────┐ │ ┌───▼────────────┐ │ │ │ ┌──────────────────────────┤ // │ │ └──────────────────┐ │ ┌───▼────────────┐ │
// │ └──────────────────────────────┐ │ │ │podtask.Registry│ │ allocationStrategy // │ └──────────────────────────────┐ │ │ │podtask.Registry│ │
// │ │ │ │ └────────────────┘ │ │ └───┼────▶ - FitPredicate │ // │ │ │ │ └────────────────┘ │ │ ┌──────────────────────┐
// │ │ │ │ │ │ │ - Procurement // │ │ │ │ │ │ │
// │Scheduler │ └──────┐ │ │ └─────┴─────────┬────────────────┘ // │Scheduler │ └──────┐ │ │ │ │ A ──────────▶ B │
// └──────────────────────────────────────┼────────┼─┬│----┬──────────────────────┼───────────────────────── // └──────────────────────────────────────┼────────┼─┬│----┬──────────────────────┼─────────────────── │ │
// ┌──────────────────────────────────────┼────────┼─┤sched├──────────────────────┼─────────────────────────┐ // ┌──────────────────────────────────────┼────────┼─┤sched├──────────────────────┼─────────────────────────┐ │ A has a reference │
// │Framework │ │ └─────┘ ┌────▼───┐ │ // │Framework │ │ └─────┘ ┌────▼───┐ │ │ on B and calls B │
// │ ┌──────▼──────┐┌▼──────────┐ │Offers()│ │ // │ ┌──────▼──────┐┌▼──────────┐ │Offers()│ │ │ │
// │ │LaunchTask(t)││KillTask(t)│ └────┬───┘ │ // │ │LaunchTask(t)││KillTask(t)│ └────┬───┘ │ └──────────────────────┘
// │ └─────────┬───┘└──────┬────┘ ┌────────▼───────┐ │ // │ └─────────┬───┘└──────┬────┘ ┌────────▼───────┐ │
// │implements: mesos-go/scheduler.Scheduler └───────────▼ │offers.Registry │ │ // │implements: mesos-go/scheduler.Scheduler └───────────▼ │offers.Registry │ │
// │ │ └────────────────┘ │ // │ │ └────────────────┘ │