Rename api.Scheduler -> types.Framework

This commit is contained in:
Dr. Stefan Schimanski 2015-10-26 14:42:25 -05:00
parent de8b958b2f
commit f4e1de55d6
10 changed files with 95 additions and 95 deletions

View File

@ -957,7 +957,7 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
Config: &plugin.Config{ Config: &plugin.Config{
NodeLister: nil, NodeLister: nil,
Algorithm: &schedulerApiAlgorithmAdapter{ Algorithm: &schedulerApiAlgorithmAdapter{
scheduler: scheduler, fw: scheduler,
podUpdates: podUpdates, podUpdates: podUpdates,
}, },
Binder: operations.NewBinder(scheduler), Binder: operations.NewBinder(scheduler),
@ -965,10 +965,10 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
Error: eh.Error, Error: eh.Error,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
}, },
scheduler: scheduler, fw: scheduler,
client: k.client, client: k.client,
qr: q, qr: q,
deleter: podDeleter, deleter: podDeleter,
starting: startLatch, starting: startLatch,
} }
} }

View File

@ -21,20 +21,20 @@ import (
"strconv" "strconv"
log "github.com/golang/glog" log "github.com/golang/glog"
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
type Binder struct { type Binder struct {
scheduler schedapi.Scheduler fw types.Framework
} }
func NewBinder(scheduler schedapi.Scheduler) *Binder { func NewBinder(fw types.Framework) *Binder {
return &Binder{ return &Binder{
scheduler: scheduler, fw: fw,
} }
} }
@ -49,10 +49,10 @@ func (b *Binder) Bind(binding *api.Binding) error {
return err return err
} }
b.scheduler.Lock() b.fw.Lock()
defer b.scheduler.Unlock() defer b.fw.Unlock()
switch task, state := b.scheduler.Tasks().ForPod(podKey); state { switch task, state := b.fw.Tasks().ForPod(podKey); state {
case podtask.StatePending: case podtask.StatePending:
return b.bind(ctx, binding, task) return b.bind(ctx, binding, task)
default: default:
@ -66,7 +66,7 @@ func (b *Binder) Bind(binding *api.Binding) error {
func (b *Binder) rollback(task *podtask.T, err error) error { func (b *Binder) rollback(task *podtask.T, err error) error {
task.Offer.Release() task.Offer.Release()
task.Reset() task.Reset()
if err2 := b.scheduler.Tasks().Update(task); err2 != nil { if err2 := b.fw.Tasks().Update(task); err2 != nil {
log.Errorf("failed to update pod task: %v", err2) log.Errorf("failed to update pod task: %v", err2)
} }
return err return err
@ -88,7 +88,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
// By this time, there is a chance that the slave is disconnected. // By this time, there is a chance that the slave is disconnected.
offerId := task.GetOfferId() offerId := task.GetOfferId()
if offer, ok := b.scheduler.Offers().Get(offerId); !ok || offer.HasExpired() { if offer, ok := b.fw.Offers().Get(offerId); !ok || offer.HasExpired() {
// already rescinded or timed out or otherwise invalidated // already rescinded or timed out or otherwise invalidated
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID)) return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
} }
@ -96,10 +96,10 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
if err = b.scheduler.LaunchTask(task); err == nil { if err = b.fw.LaunchTask(task); err == nil {
b.scheduler.Offers().Invalidate(offerId) b.fw.Offers().Invalidate(offerId)
task.Set(podtask.Launched) task.Set(podtask.Launched)
if err = b.scheduler.Tasks().Update(task); err != nil { if err = b.fw.Tasks().Update(task); err != nil {
// this should only happen if the task has been removed or has changed status, // this should only happen if the task has been removed or has changed status,
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly // which SHOULD NOT HAPPEN as long as we're synchronizing correctly
log.Errorf("failed to update task w/ Launched status: %v", err) log.Errorf("failed to update task w/ Launched status: %v", err)

View File

@ -22,22 +22,22 @@ import (
log "github.com/golang/glog" log "github.com/golang/glog"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
type Deleter struct { type Deleter struct {
scheduler schedapi.Scheduler fw types.Framework
qr *queuer.Queuer qr *queuer.Queuer
} }
func NewDeleter(scheduler schedapi.Scheduler, qr *queuer.Queuer) *Deleter { func NewDeleter(fw types.Framework, qr *queuer.Queuer) *Deleter {
return &Deleter{ return &Deleter{
scheduler: scheduler, fw: fw,
qr: qr, qr: qr,
} }
} }
@ -72,8 +72,8 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
// removing the pod from the scheduling queue. this makes the concurrent // removing the pod from the scheduling queue. this makes the concurrent
// execution of scheduler-error-handling and delete-handling easier to // execution of scheduler-error-handling and delete-handling easier to
// reason about. // reason about.
k.scheduler.Lock() k.fw.Lock()
defer k.scheduler.Unlock() defer k.fw.Unlock()
// prevent the scheduler from attempting to pop this; it's also possible that // prevent the scheduler from attempting to pop this; it's also possible that
// it's concurrently being scheduled (somewhere between pod scheduling and // it's concurrently being scheduled (somewhere between pod scheduling and
@ -81,7 +81,7 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
// will abort Bind()ing // will abort Bind()ing
k.qr.Dequeue(pod.GetUID()) k.qr.Dequeue(pod.GetUID())
switch task, state := k.scheduler.Tasks().ForPod(podKey); state { switch task, state := k.fw.Tasks().ForPod(podKey); state {
case podtask.StateUnknown: case podtask.StateUnknown:
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey) log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
return merrors.NoSuchPodErr return merrors.NoSuchPodErr
@ -96,11 +96,11 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
task.Reset() task.Reset()
task.Set(podtask.Deleted) task.Set(podtask.Deleted)
//TODO(jdef) probably want better handling here //TODO(jdef) probably want better handling here
if err := k.scheduler.Tasks().Update(task); err != nil { if err := k.fw.Tasks().Update(task); err != nil {
return err return err
} }
} }
k.scheduler.Tasks().Unregister(task) k.fw.Tasks().Unregister(task)
return nil return nil
} }
fallthrough fallthrough
@ -108,10 +108,10 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
case podtask.StateRunning: case podtask.StateRunning:
// signal to watchers that the related pod is going down // signal to watchers that the related pod is going down
task.Set(podtask.Deleted) task.Set(podtask.Deleted)
if err := k.scheduler.Tasks().Update(task); err != nil { if err := k.fw.Tasks().Update(task); err != nil {
log.Errorf("failed to update task w/ Deleted status: %v", err) log.Errorf("failed to update task w/ Deleted status: %v", err)
} }
return k.scheduler.KillTask(task.ID) return k.fw.KillTask(task.ID)
default: default:
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID) log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)

View File

@ -19,20 +19,19 @@ package operations
import ( import (
"testing" "testing"
"k8s.io/kubernetes/pkg/api"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api"
) )
func TestDeleteOne_NonexistentPod(t *testing.T) { func TestDeleteOne_NonexistentPod(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
obj := &schedapi.MockScheduler{} obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry() reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg) obj.On("Tasks").Return(reg)
@ -51,7 +50,7 @@ func TestDeleteOne_NonexistentPod(t *testing.T) {
func TestDeleteOne_PendingPod(t *testing.T) { func TestDeleteOne_PendingPod(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
obj := &schedapi.MockScheduler{} obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry() reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg) obj.On("Tasks").Return(reg)
@ -85,7 +84,7 @@ func TestDeleteOne_PendingPod(t *testing.T) {
func TestDeleteOne_Running(t *testing.T) { func TestDeleteOne_Running(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
obj := &schedapi.MockScheduler{} obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry() reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg) obj.On("Tasks").Return(reg)
@ -127,7 +126,7 @@ func TestDeleteOne_Running(t *testing.T) {
func TestDeleteOne_badPodNaming(t *testing.T) { func TestDeleteOne_badPodNaming(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
obj := &schedapi.MockScheduler{} obj := &types.MockScheduler{}
pod := &queuer.Pod{Pod: &api.Pod{}} pod := &queuer.Pod{Pod: &api.Pod{}}
d := NewDeleter(obj, queuer.New(nil)) d := NewDeleter(obj, queuer.New(nil))

View File

@ -21,26 +21,26 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/backoff"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
) )
type ErrorHandler struct { type ErrorHandler struct {
scheduler schedapi.Scheduler fw types.Framework
backoff *backoff.Backoff backoff *backoff.Backoff
qr *queuer.Queuer qr *queuer.Queuer
} }
func NewErrorHandler(scheduler schedapi.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer) *ErrorHandler { func NewErrorHandler(fw types.Framework, backoff *backoff.Backoff, qr *queuer.Queuer) *ErrorHandler {
return &ErrorHandler{ return &ErrorHandler{
scheduler: scheduler, fw: fw,
backoff: backoff, backoff: backoff,
qr: qr, qr: qr,
} }
} }
@ -64,10 +64,10 @@ func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) {
} }
k.backoff.GC() k.backoff.GC()
k.scheduler.Lock() k.fw.Lock()
defer k.scheduler.Unlock() defer k.fw.Unlock()
switch task, state := k.scheduler.Tasks().ForPod(podKey); state { switch task, state := k.fw.Tasks().ForPod(podKey); state {
case podtask.StateUnknown: case podtask.StateUnknown:
// if we don't have a mapping here any more then someone deleted the pod // if we don't have a mapping here any more then someone deleted the pod
log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey) log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey)
@ -81,16 +81,16 @@ func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) {
breakoutEarly := queue.BreakChan(nil) breakoutEarly := queue.BreakChan(nil)
if schedulingErr == podschedulers.NoSuitableOffersErr { if schedulingErr == podschedulers.NoSuitableOffersErr {
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
breakoutEarly = queue.BreakChan(k.scheduler.Offers().Listen(podKey, func(offer *mesos.Offer) bool { breakoutEarly = queue.BreakChan(k.fw.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
k.scheduler.Lock() k.fw.Lock()
defer k.scheduler.Unlock() defer k.fw.Unlock()
switch task, state := k.scheduler.Tasks().Get(task.ID); state { switch task, state := k.fw.Tasks().Get(task.ID); state {
case podtask.StatePending: case podtask.StatePending:
// Assess fitness of pod with the current offer. The scheduler normally // Assess fitness of pod with the current offer. The scheduler normally
// "backs off" when it can't find an offer that matches up with a pod. // "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes // The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up. // available that matches up.
return !task.Has(podtask.Launched) && k.scheduler.PodScheduler().FitPredicate()(task, offer, nil) return !task.Has(podtask.Launched) && k.fw.PodScheduler().FitPredicate()(task, offer, nil)
default: default:
// no point in continuing to check for matching offers // no point in continuing to check for matching offers
return true return true

View File

@ -27,12 +27,12 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors" apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
@ -99,7 +99,7 @@ func (k *mesosSchedulerApiAdapter) LaunchTask(task *podtask.T) error {
// k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface // k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface
type schedulerApiAlgorithmAdapter struct { type schedulerApiAlgorithmAdapter struct {
scheduler schedapi.Scheduler fw types.Framework
podUpdates queue.FIFO podUpdates queue.FIFO
} }
@ -115,10 +115,10 @@ func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.N
return "", err return "", err
} }
k.scheduler.Lock() k.fw.Lock()
defer k.scheduler.Unlock() defer k.fw.Unlock()
switch task, state := k.scheduler.Tasks().ForPod(podKey); state { switch task, state := k.fw.Tasks().ForPod(podKey); state {
case podtask.StateUnknown: case podtask.StateUnknown:
// There's a bit of a potential race here, a pod could have been yielded() and // There's a bit of a potential race here, a pod could have been yielded() and
// then before we get *here* it could be deleted. // then before we get *here* it could be deleted.
@ -133,7 +133,7 @@ func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.N
log.Infof("aborting Schedule, pod has been deleted %+v", pod) log.Infof("aborting Schedule, pod has been deleted %+v", pod)
return "", merrors.NoSuchPodErr return "", merrors.NoSuchPodErr
} }
return k.doSchedule(k.scheduler.Tasks().Register(k.scheduler.CreatePodTask(ctx, pod))) return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod)))
//TODO(jdef) it's possible that the pod state has diverged from what //TODO(jdef) it's possible that the pod state has diverged from what
//we knew previously, we should probably update the task.Pod state here //we knew previously, we should probably update the task.Pod state here
@ -163,19 +163,19 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
if task.HasAcceptedOffer() { if task.HasAcceptedOffer() {
// verify that the offer is still on the table // verify that the offer is still on the table
offerId := task.GetOfferId() offerId := task.GetOfferId()
if offer, ok := k.scheduler.Offers().Get(offerId); ok && !offer.HasExpired() { if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() {
// skip tasks that have already have assigned offers // skip tasks that have already have assigned offers
offer = task.Offer offer = task.Offer
} else { } else {
task.Offer.Release() task.Offer.Release()
task.Reset() task.Reset()
if err = k.scheduler.Tasks().Update(task); err != nil { if err = k.fw.Tasks().Update(task); err != nil {
return "", err return "", err
} }
} }
} }
if err == nil && offer == nil { if err == nil && offer == nil {
offer, err = k.scheduler.PodScheduler().SchedulePod(k.scheduler.Offers(), k.scheduler, task) offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task)
} }
if err != nil { if err != nil {
return "", err return "", err
@ -185,10 +185,10 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
} }
slaveId := details.GetSlaveId().GetValue() slaveId := details.GetSlaveId().GetValue()
if slaveHostName := k.scheduler.SlaveHostNameFor(slaveId); slaveHostName == "" { if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" {
// not much sense in Release()ing the offer here since its owner died // not much sense in Release()ing the offer here since its owner died
offer.Release() offer.Release()
k.scheduler.Offers().Invalidate(details.Id.GetValue()) k.fw.Offers().Invalidate(details.Id.GetValue())
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
} else { } else {
if task.Offer != nil && task.Offer != offer { if task.Offer != nil && task.Offer != offer {
@ -196,9 +196,9 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
} }
task.Offer = offer task.Offer = offer
k.scheduler.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
if err := k.scheduler.Tasks().Update(task); err != nil { if err := k.fw.Tasks().Update(task); err != nil {
offer.Release() offer.Release()
return "", err return "", err
} }
@ -208,31 +208,31 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
type PluginConfig struct { type PluginConfig struct {
*plugin.Config *plugin.Config
scheduler schedapi.Scheduler fw types.Framework
client *client.Client client *client.Client
qr *queuer.Queuer qr *queuer.Queuer
deleter *operations.Deleter deleter *operations.Deleter
starting chan struct{} // startup latch starting chan struct{} // startup latch
} }
func NewPlugin(c *PluginConfig) PluginInterface { func NewPlugin(c *PluginConfig) PluginInterface {
return &schedulerPlugin{ return &schedulerPlugin{
config: c.Config, config: c.Config,
scheduler: c.scheduler, fw: c.fw,
client: c.client, client: c.client,
qr: c.qr, qr: c.qr,
deleter: c.deleter, deleter: c.deleter,
starting: c.starting, starting: c.starting,
} }
} }
type schedulerPlugin struct { type schedulerPlugin struct {
config *plugin.Config config *plugin.Config
scheduler schedapi.Scheduler fw types.Framework
client *client.Client client *client.Client
qr *queuer.Queuer qr *queuer.Queuer
deleter *operations.Deleter deleter *operations.Deleter
starting chan struct{} starting chan struct{}
} }
func (s *schedulerPlugin) Run(done <-chan struct{}) { func (s *schedulerPlugin) Run(done <-chan struct{}) {
@ -320,10 +320,10 @@ func (s *schedulerPlugin) reconcileTask(t *podtask.T) {
return return
} }
s.scheduler.Lock() s.fw.Lock()
defer s.scheduler.Unlock() defer s.fw.Unlock()
if _, state := s.scheduler.Tasks().ForPod(podKey); state != podtask.StateUnknown { if _, state := s.fw.Tasks().ForPod(podKey); state != podtask.StateUnknown {
//TODO(jdef) reconcile the task //TODO(jdef) reconcile the task
log.Errorf("task already registered for pod %v", pod.Name) log.Errorf("task already registered for pod %v", pod.Name)
return return

View File

@ -809,7 +809,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
t, _ := lt.plugin.scheduler.Tasks().ForPod(podKey) t, _ := lt.plugin.fw.Tasks().ForPod(podKey)
return t == nil return t == nil
}) })

View File

@ -14,6 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package api contains an abstract scheduler interface, implemented by the // Package types contains an abstract framework interface, implemented by the
// scheduler plugin and consumed by the scheduler operations. // MesosScheduler and consumed by the scheduler operations.
package api package types

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package api package types
import ( import (
"sync" "sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package api package types
import ( import (
"sync" "sync"
@ -25,8 +25,9 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
// scheduler abstraction to allow for easier unit testing // Framework abstracts everything other components of the scheduler need from
type Scheduler interface { // the actual MesosScheduler implementation.
type Framework interface {
sync.Locker // synchronize scheduler plugin operations sync.Locker // synchronize scheduler plugin operations
podschedulers.SlaveIndex podschedulers.SlaveIndex