Rename kubeScheduler -> schedulerApiAlgorithmAdapter and k8smScheduler -> mesosSchedulerApiAdapter

This commit is contained in:
Dr. Stefan Schimanski 2015-10-25 15:12:09 -07:00
parent 30b5faff53
commit 98e48a2680

View File

@ -53,54 +53,55 @@ const (
Scheduled = "Scheduled" Scheduled = "Scheduled"
) )
type k8smScheduler struct { type mesosSchedulerApiAdapter struct {
sync.Mutex sync.Mutex
internal *KubernetesMesosScheduler mesosScheduler *KubernetesMesosScheduler
} }
func (k *k8smScheduler) Algorithm() malgorithm.PodScheduler { func (k *mesosSchedulerApiAdapter) Algorithm() malgorithm.PodScheduler {
return k.internal return k.mesosScheduler
} }
func (k *k8smScheduler) Offers() offers.Registry { func (k *mesosSchedulerApiAdapter) Offers() offers.Registry {
return k.internal.offers return k.mesosScheduler.offers
} }
func (k *k8smScheduler) Tasks() podtask.Registry { func (k *mesosSchedulerApiAdapter) Tasks() podtask.Registry {
return k.internal.taskRegistry return k.mesosScheduler.taskRegistry
} }
func (k *k8smScheduler) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) { func (k *mesosSchedulerApiAdapter) CreatePodTask(ctx api.Context, pod *api.Pod) (*podtask.T, error) {
return podtask.New(ctx, "", *pod, k.internal.executor) return podtask.New(ctx, "", *pod, k.mesosScheduler.executor)
} }
func (k *k8smScheduler) SlaveHostNameFor(id string) string { func (k *mesosSchedulerApiAdapter) SlaveHostNameFor(id string) string {
return k.internal.slaveHostNames.HostName(id) return k.mesosScheduler.slaveHostNames.HostName(id)
} }
func (k *k8smScheduler) KillTask(taskId string) error { func (k *mesosSchedulerApiAdapter) KillTask(taskId string) error {
killTaskId := mutil.NewTaskID(taskId) killTaskId := mutil.NewTaskID(taskId)
_, err := k.internal.driver.KillTask(killTaskId) _, err := k.mesosScheduler.driver.KillTask(killTaskId)
return err return err
} }
func (k *k8smScheduler) LaunchTask(task *podtask.T) error { func (k *mesosSchedulerApiAdapter) LaunchTask(task *podtask.T) error {
// assume caller is holding scheduler lock // assume caller is holding scheduler lock
taskList := []*mesos.TaskInfo{task.BuildTaskInfo()} taskList := []*mesos.TaskInfo{task.BuildTaskInfo()}
offerIds := []*mesos.OfferID{task.Offer.Details().Id} offerIds := []*mesos.OfferID{task.Offer.Details().Id}
filters := &mesos.Filters{} filters := &mesos.Filters{}
_, err := k.internal.driver.LaunchTasks(offerIds, taskList, filters) _, err := k.mesosScheduler.driver.LaunchTasks(offerIds, taskList, filters)
return err return err
} }
type kubeScheduler struct { // k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface
type schedulerApiAlgorithmAdapter struct {
api schedapi.SchedulerApi api schedapi.SchedulerApi
podUpdates queue.FIFO podUpdates queue.FIFO
} }
// Schedule implements the Scheduler interface of Kubernetes. // Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's name and error (if there's any). // It returns the selectedMachine's name and error (if there's any).
func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) { func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) {
log.Infof("Try to schedule pod %v\n", pod.Name) log.Infof("Try to schedule pod %v\n", pod.Name)
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
@ -153,7 +154,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
} }
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on // Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (string, error) {
var offer offers.Perishable var offer offers.Perishable
if task.HasAcceptedOffer() { if task.HasAcceptedOffer() {
// verify that the offer is still on the table // verify that the offer is still on the table
@ -286,7 +287,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
// lock that guards critial sections that involve transferring pods from // lock that guards critial sections that involve transferring pods from
// the store (cache) to the scheduling queue; its purpose is to maintain // the store (cache) to the scheduling queue; its purpose is to maintain
// an ordering (vs interleaving) of operations that's easier to reason about. // an ordering (vs interleaving) of operations that's easier to reason about.
kapi := &k8smScheduler{internal: k} kapi := &mesosSchedulerApiAdapter{mesosScheduler: k}
q := queuer.New(podUpdates) q := queuer.New(podUpdates)
podDeleter := operations.NewDeleter(kapi, q) podDeleter := operations.NewDeleter(kapi, q)
eh := &errorHandler{ eh := &errorHandler{
@ -308,7 +309,7 @@ func (k *KubernetesMesosScheduler) NewPluginConfig(terminate <-chan struct{}, mu
return &PluginConfig{ return &PluginConfig{
Config: &plugin.Config{ Config: &plugin.Config{
NodeLister: nil, NodeLister: nil,
Algorithm: &kubeScheduler{ Algorithm: &schedulerApiAlgorithmAdapter{
api: kapi, api: kapi,
podUpdates: podUpdates, podUpdates: podUpdates,
}, },