mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 12:32:03 +00:00
Rename SchedulerApi -> Scheduler, api -> scheduler
This commit is contained in:
parent
5f8e0a60bf
commit
4b715cfcc5
@ -26,7 +26,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// scheduler abstraction to allow for easier unit testing
|
// scheduler abstraction to allow for easier unit testing
|
||||||
type SchedulerApi interface {
|
type Scheduler interface {
|
||||||
sync.Locker // synchronize scheduler plugin operations
|
sync.Locker // synchronize scheduler plugin operations
|
||||||
|
|
||||||
podschedulers.SlaveIndex
|
podschedulers.SlaveIndex
|
||||||
|
@ -37,11 +37,11 @@ import (
|
|||||||
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
|
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
|
||||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave"
|
||||||
@ -936,13 +936,13 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
|
|||||||
// lock that guards critial sections that involve transferring pods from
|
// lock that guards critial sections that involve transferring pods from
|
||||||
// the store (cache) to the scheduling queue; its purpose is to maintain
|
// the store (cache) to the scheduling queue; its purpose is to maintain
|
||||||
// an ordering (vs interleaving) of operations that's easier to reason about.
|
// an ordering (vs interleaving) of operations that's easier to reason about.
|
||||||
kapi := &mesosSchedulerApiAdapter{mesosScheduler: k}
|
scheduler := &mesosSchedulerApiAdapter{mesosScheduler: k}
|
||||||
q := queuer.New(podUpdates)
|
q := queuer.New(podUpdates)
|
||||||
podDeleter := operations.NewDeleter(kapi, q)
|
podDeleter := operations.NewDeleter(scheduler, q)
|
||||||
eh := &errorHandler{
|
eh := &errorHandler{
|
||||||
api: kapi,
|
scheduler: scheduler,
|
||||||
backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration),
|
backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration),
|
||||||
qr: q,
|
qr: q,
|
||||||
}
|
}
|
||||||
startLatch := make(chan struct{})
|
startLatch := make(chan struct{})
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
@ -959,18 +959,18 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
|
|||||||
Config: &plugin.Config{
|
Config: &plugin.Config{
|
||||||
NodeLister: nil,
|
NodeLister: nil,
|
||||||
Algorithm: &schedulerApiAlgorithmAdapter{
|
Algorithm: &schedulerApiAlgorithmAdapter{
|
||||||
api: kapi,
|
scheduler: scheduler,
|
||||||
podUpdates: podUpdates,
|
podUpdates: podUpdates,
|
||||||
},
|
},
|
||||||
Binder: operations.NewBinder(kapi),
|
Binder: operations.NewBinder(scheduler),
|
||||||
NextPod: q.Yield,
|
NextPod: q.Yield,
|
||||||
Error: eh.handleSchedulingError,
|
Error: eh.handleSchedulingError,
|
||||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
|
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
|
||||||
},
|
},
|
||||||
api: kapi,
|
scheduler: scheduler,
|
||||||
client: k.client,
|
client: k.client,
|
||||||
qr: q,
|
qr: q,
|
||||||
deleter: podDeleter,
|
deleter: podDeleter,
|
||||||
starting: startLatch,
|
starting: startLatch,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,12 +29,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Binder struct {
|
type Binder struct {
|
||||||
api schedapi.SchedulerApi
|
scheduler schedapi.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBinder(api schedapi.SchedulerApi) *Binder {
|
func NewBinder(scheduler schedapi.Scheduler) *Binder {
|
||||||
return &Binder{
|
return &Binder{
|
||||||
api: api,
|
scheduler: scheduler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,10 +49,10 @@ func (b *Binder) Bind(binding *api.Binding) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b.api.Lock()
|
b.scheduler.Lock()
|
||||||
defer b.api.Unlock()
|
defer b.scheduler.Unlock()
|
||||||
|
|
||||||
switch task, state := b.api.Tasks().ForPod(podKey); state {
|
switch task, state := b.scheduler.Tasks().ForPod(podKey); state {
|
||||||
case podtask.StatePending:
|
case podtask.StatePending:
|
||||||
return b.bind(ctx, binding, task)
|
return b.bind(ctx, binding, task)
|
||||||
default:
|
default:
|
||||||
@ -66,7 +66,7 @@ func (b *Binder) Bind(binding *api.Binding) error {
|
|||||||
func (b *Binder) rollback(task *podtask.T, err error) error {
|
func (b *Binder) rollback(task *podtask.T, err error) error {
|
||||||
task.Offer.Release()
|
task.Offer.Release()
|
||||||
task.Reset()
|
task.Reset()
|
||||||
if err2 := b.api.Tasks().Update(task); err2 != nil {
|
if err2 := b.scheduler.Tasks().Update(task); err2 != nil {
|
||||||
log.Errorf("failed to update pod task: %v", err2)
|
log.Errorf("failed to update pod task: %v", err2)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -88,7 +88,7 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
|
|||||||
|
|
||||||
// By this time, there is a chance that the slave is disconnected.
|
// By this time, there is a chance that the slave is disconnected.
|
||||||
offerId := task.GetOfferId()
|
offerId := task.GetOfferId()
|
||||||
if offer, ok := b.api.Offers().Get(offerId); !ok || offer.HasExpired() {
|
if offer, ok := b.scheduler.Offers().Get(offerId); !ok || offer.HasExpired() {
|
||||||
// already rescinded or timed out or otherwise invalidated
|
// already rescinded or timed out or otherwise invalidated
|
||||||
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
|
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
|
||||||
}
|
}
|
||||||
@ -96,10 +96,10 @@ func (b *Binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
|
|||||||
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
|
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
|
||||||
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
|
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
|
||||||
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
|
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
|
||||||
if err = b.api.LaunchTask(task); err == nil {
|
if err = b.scheduler.LaunchTask(task); err == nil {
|
||||||
b.api.Offers().Invalidate(offerId)
|
b.scheduler.Offers().Invalidate(offerId)
|
||||||
task.Set(podtask.Launched)
|
task.Set(podtask.Launched)
|
||||||
if err = b.api.Tasks().Update(task); err != nil {
|
if err = b.scheduler.Tasks().Update(task); err != nil {
|
||||||
// this should only happen if the task has been removed or has changed status,
|
// this should only happen if the task has been removed or has changed status,
|
||||||
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly
|
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly
|
||||||
log.Errorf("failed to update task w/ Launched status: %v", err)
|
log.Errorf("failed to update task w/ Launched status: %v", err)
|
||||||
|
@ -30,14 +30,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Deleter struct {
|
type Deleter struct {
|
||||||
api schedapi.SchedulerApi
|
scheduler schedapi.Scheduler
|
||||||
qr *queuer.Queuer
|
qr *queuer.Queuer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDeleter(api schedapi.SchedulerApi, qr *queuer.Queuer) *Deleter {
|
func NewDeleter(scheduler schedapi.Scheduler, qr *queuer.Queuer) *Deleter {
|
||||||
return &Deleter{
|
return &Deleter{
|
||||||
api: api,
|
scheduler: scheduler,
|
||||||
qr: qr,
|
qr: qr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,8 +72,8 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
|
|||||||
// removing the pod from the scheduling queue. this makes the concurrent
|
// removing the pod from the scheduling queue. this makes the concurrent
|
||||||
// execution of scheduler-error-handling and delete-handling easier to
|
// execution of scheduler-error-handling and delete-handling easier to
|
||||||
// reason about.
|
// reason about.
|
||||||
k.api.Lock()
|
k.scheduler.Lock()
|
||||||
defer k.api.Unlock()
|
defer k.scheduler.Unlock()
|
||||||
|
|
||||||
// prevent the scheduler from attempting to pop this; it's also possible that
|
// prevent the scheduler from attempting to pop this; it's also possible that
|
||||||
// it's concurrently being scheduled (somewhere between pod scheduling and
|
// it's concurrently being scheduled (somewhere between pod scheduling and
|
||||||
@ -81,7 +81,7 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
|
|||||||
// will abort Bind()ing
|
// will abort Bind()ing
|
||||||
k.qr.Dequeue(pod.GetUID())
|
k.qr.Dequeue(pod.GetUID())
|
||||||
|
|
||||||
switch task, state := k.api.Tasks().ForPod(podKey); state {
|
switch task, state := k.scheduler.Tasks().ForPod(podKey); state {
|
||||||
case podtask.StateUnknown:
|
case podtask.StateUnknown:
|
||||||
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
|
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
|
||||||
return merrors.NoSuchPodErr
|
return merrors.NoSuchPodErr
|
||||||
@ -96,11 +96,11 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
|
|||||||
task.Reset()
|
task.Reset()
|
||||||
task.Set(podtask.Deleted)
|
task.Set(podtask.Deleted)
|
||||||
//TODO(jdef) probably want better handling here
|
//TODO(jdef) probably want better handling here
|
||||||
if err := k.api.Tasks().Update(task); err != nil {
|
if err := k.scheduler.Tasks().Update(task); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
k.api.Tasks().Unregister(task)
|
k.scheduler.Tasks().Unregister(task)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
fallthrough
|
fallthrough
|
||||||
@ -108,10 +108,10 @@ func (k *Deleter) DeleteOne(pod *queuer.Pod) error {
|
|||||||
case podtask.StateRunning:
|
case podtask.StateRunning:
|
||||||
// signal to watchers that the related pod is going down
|
// signal to watchers that the related pod is going down
|
||||||
task.Set(podtask.Deleted)
|
task.Set(podtask.Deleted)
|
||||||
if err := k.api.Tasks().Update(task); err != nil {
|
if err := k.scheduler.Tasks().Update(task); err != nil {
|
||||||
log.Errorf("failed to update task w/ Deleted status: %v", err)
|
log.Errorf("failed to update task w/ Deleted status: %v", err)
|
||||||
}
|
}
|
||||||
return k.api.KillTask(task.ID)
|
return k.scheduler.KillTask(task.ID)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)
|
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)
|
||||||
|
@ -28,10 +28,10 @@ import (
|
|||||||
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
|
||||||
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
|
schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api"
|
||||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -102,7 +102,7 @@ func (k *mesosSchedulerApiAdapter) LaunchTask(task *podtask.T) error {
|
|||||||
|
|
||||||
// k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface
|
// k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface
|
||||||
type schedulerApiAlgorithmAdapter struct {
|
type schedulerApiAlgorithmAdapter struct {
|
||||||
api schedapi.SchedulerApi
|
scheduler schedapi.Scheduler
|
||||||
podUpdates queue.FIFO
|
podUpdates queue.FIFO
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,10 +118,10 @@ func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.N
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
k.api.Lock()
|
k.scheduler.Lock()
|
||||||
defer k.api.Unlock()
|
defer k.scheduler.Unlock()
|
||||||
|
|
||||||
switch task, state := k.api.Tasks().ForPod(podKey); state {
|
switch task, state := k.scheduler.Tasks().ForPod(podKey); state {
|
||||||
case podtask.StateUnknown:
|
case podtask.StateUnknown:
|
||||||
// There's a bit of a potential race here, a pod could have been yielded() and
|
// There's a bit of a potential race here, a pod could have been yielded() and
|
||||||
// then before we get *here* it could be deleted.
|
// then before we get *here* it could be deleted.
|
||||||
@ -136,7 +136,7 @@ func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.N
|
|||||||
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
||||||
return "", merrors.NoSuchPodErr
|
return "", merrors.NoSuchPodErr
|
||||||
}
|
}
|
||||||
return k.doSchedule(k.api.Tasks().Register(k.api.CreatePodTask(ctx, pod)))
|
return k.doSchedule(k.scheduler.Tasks().Register(k.scheduler.CreatePodTask(ctx, pod)))
|
||||||
|
|
||||||
//TODO(jdef) it's possible that the pod state has diverged from what
|
//TODO(jdef) it's possible that the pod state has diverged from what
|
||||||
//we knew previously, we should probably update the task.Pod state here
|
//we knew previously, we should probably update the task.Pod state here
|
||||||
@ -166,19 +166,19 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
|
|||||||
if task.HasAcceptedOffer() {
|
if task.HasAcceptedOffer() {
|
||||||
// verify that the offer is still on the table
|
// verify that the offer is still on the table
|
||||||
offerId := task.GetOfferId()
|
offerId := task.GetOfferId()
|
||||||
if offer, ok := k.api.Offers().Get(offerId); ok && !offer.HasExpired() {
|
if offer, ok := k.scheduler.Offers().Get(offerId); ok && !offer.HasExpired() {
|
||||||
// skip tasks that have already have assigned offers
|
// skip tasks that have already have assigned offers
|
||||||
offer = task.Offer
|
offer = task.Offer
|
||||||
} else {
|
} else {
|
||||||
task.Offer.Release()
|
task.Offer.Release()
|
||||||
task.Reset()
|
task.Reset()
|
||||||
if err = k.api.Tasks().Update(task); err != nil {
|
if err = k.scheduler.Tasks().Update(task); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil && offer == nil {
|
if err == nil && offer == nil {
|
||||||
offer, err = k.api.PodScheduler().SchedulePod(k.api.Offers(), k.api, task)
|
offer, err = k.scheduler.PodScheduler().SchedulePod(k.scheduler.Offers(), k.scheduler, task)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@ -188,10 +188,10 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
|
|||||||
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
||||||
}
|
}
|
||||||
slaveId := details.GetSlaveId().GetValue()
|
slaveId := details.GetSlaveId().GetValue()
|
||||||
if slaveHostName := k.api.SlaveHostNameFor(slaveId); slaveHostName == "" {
|
if slaveHostName := k.scheduler.SlaveHostNameFor(slaveId); slaveHostName == "" {
|
||||||
// not much sense in Release()ing the offer here since its owner died
|
// not much sense in Release()ing the offer here since its owner died
|
||||||
offer.Release()
|
offer.Release()
|
||||||
k.api.Offers().Invalidate(details.Id.GetValue())
|
k.scheduler.Offers().Invalidate(details.Id.GetValue())
|
||||||
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
||||||
} else {
|
} else {
|
||||||
if task.Offer != nil && task.Offer != offer {
|
if task.Offer != nil && task.Offer != offer {
|
||||||
@ -199,9 +199,9 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
|
|||||||
}
|
}
|
||||||
|
|
||||||
task.Offer = offer
|
task.Offer = offer
|
||||||
k.api.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
k.scheduler.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
||||||
|
|
||||||
if err := k.api.Tasks().Update(task); err != nil {
|
if err := k.scheduler.Tasks().Update(task); err != nil {
|
||||||
offer.Release()
|
offer.Release()
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -210,9 +210,9 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s
|
|||||||
}
|
}
|
||||||
|
|
||||||
type errorHandler struct {
|
type errorHandler struct {
|
||||||
api schedapi.SchedulerApi
|
scheduler schedapi.Scheduler
|
||||||
backoff *backoff.Backoff
|
backoff *backoff.Backoff
|
||||||
qr *queuer.Queuer
|
qr *queuer.Queuer
|
||||||
}
|
}
|
||||||
|
|
||||||
// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler
|
// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler
|
||||||
@ -235,10 +235,10 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
k.backoff.GC()
|
k.backoff.GC()
|
||||||
k.api.Lock()
|
k.scheduler.Lock()
|
||||||
defer k.api.Unlock()
|
defer k.scheduler.Unlock()
|
||||||
|
|
||||||
switch task, state := k.api.Tasks().ForPod(podKey); state {
|
switch task, state := k.scheduler.Tasks().ForPod(podKey); state {
|
||||||
case podtask.StateUnknown:
|
case podtask.StateUnknown:
|
||||||
// if we don't have a mapping here any more then someone deleted the pod
|
// if we don't have a mapping here any more then someone deleted the pod
|
||||||
log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey)
|
log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey)
|
||||||
@ -252,16 +252,16 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
|
|||||||
breakoutEarly := queue.BreakChan(nil)
|
breakoutEarly := queue.BreakChan(nil)
|
||||||
if schedulingErr == podschedulers.NoSuitableOffersErr {
|
if schedulingErr == podschedulers.NoSuitableOffersErr {
|
||||||
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
|
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
|
||||||
breakoutEarly = queue.BreakChan(k.api.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
|
breakoutEarly = queue.BreakChan(k.scheduler.Offers().Listen(podKey, func(offer *mesos.Offer) bool {
|
||||||
k.api.Lock()
|
k.scheduler.Lock()
|
||||||
defer k.api.Unlock()
|
defer k.scheduler.Unlock()
|
||||||
switch task, state := k.api.Tasks().Get(task.ID); state {
|
switch task, state := k.scheduler.Tasks().Get(task.ID); state {
|
||||||
case podtask.StatePending:
|
case podtask.StatePending:
|
||||||
// Assess fitness of pod with the current offer. The scheduler normally
|
// Assess fitness of pod with the current offer. The scheduler normally
|
||||||
// "backs off" when it can't find an offer that matches up with a pod.
|
// "backs off" when it can't find an offer that matches up with a pod.
|
||||||
// The backoff period for a pod can terminate sooner if an offer becomes
|
// The backoff period for a pod can terminate sooner if an offer becomes
|
||||||
// available that matches up.
|
// available that matches up.
|
||||||
return !task.Has(podtask.Launched) && k.api.PodScheduler().FitPredicate()(task, offer, nil)
|
return !task.Has(podtask.Launched) && k.scheduler.PodScheduler().FitPredicate()(task, offer, nil)
|
||||||
default:
|
default:
|
||||||
// no point in continuing to check for matching offers
|
// no point in continuing to check for matching offers
|
||||||
return true
|
return true
|
||||||
@ -279,31 +279,31 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
|
|||||||
|
|
||||||
type PluginConfig struct {
|
type PluginConfig struct {
|
||||||
*plugin.Config
|
*plugin.Config
|
||||||
api schedapi.SchedulerApi
|
scheduler schedapi.Scheduler
|
||||||
client *client.Client
|
client *client.Client
|
||||||
qr *queuer.Queuer
|
qr *queuer.Queuer
|
||||||
deleter *operations.Deleter
|
deleter *operations.Deleter
|
||||||
starting chan struct{} // startup latch
|
starting chan struct{} // startup latch
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlugin(c *PluginConfig) PluginInterface {
|
func NewPlugin(c *PluginConfig) PluginInterface {
|
||||||
return &schedulerPlugin{
|
return &schedulerPlugin{
|
||||||
config: c.Config,
|
config: c.Config,
|
||||||
api: c.api,
|
scheduler: c.scheduler,
|
||||||
client: c.client,
|
client: c.client,
|
||||||
qr: c.qr,
|
qr: c.qr,
|
||||||
deleter: c.deleter,
|
deleter: c.deleter,
|
||||||
starting: c.starting,
|
starting: c.starting,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type schedulerPlugin struct {
|
type schedulerPlugin struct {
|
||||||
config *plugin.Config
|
config *plugin.Config
|
||||||
api schedapi.SchedulerApi
|
scheduler schedapi.Scheduler
|
||||||
client *client.Client
|
client *client.Client
|
||||||
qr *queuer.Queuer
|
qr *queuer.Queuer
|
||||||
deleter *operations.Deleter
|
deleter *operations.Deleter
|
||||||
starting chan struct{}
|
starting chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerPlugin) Run(done <-chan struct{}) {
|
func (s *schedulerPlugin) Run(done <-chan struct{}) {
|
||||||
@ -391,10 +391,10 @@ func (s *schedulerPlugin) reconcileTask(t *podtask.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.api.Lock()
|
s.scheduler.Lock()
|
||||||
defer s.api.Unlock()
|
defer s.scheduler.Unlock()
|
||||||
|
|
||||||
if _, state := s.api.Tasks().ForPod(podKey); state != podtask.StateUnknown {
|
if _, state := s.scheduler.Tasks().ForPod(podKey); state != podtask.StateUnknown {
|
||||||
//TODO(jdef) reconcile the task
|
//TODO(jdef) reconcile the task
|
||||||
log.Errorf("task already registered for pod %v", pod.Name)
|
log.Errorf("task already registered for pod %v", pod.Name)
|
||||||
return
|
return
|
||||||
|
@ -41,10 +41,10 @@ import (
|
|||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert"
|
assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
|
||||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||||
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
@ -809,7 +809,7 @@ func TestPlugin_LifeCycle(t *testing.T) {
|
|||||||
|
|
||||||
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
|
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
|
||||||
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
|
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
|
||||||
t, _ := lt.plugin.api.Tasks().ForPod(podKey)
|
t, _ := lt.plugin.scheduler.Tasks().ForPod(podKey)
|
||||||
return t == nil
|
return t == nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user