From e5ce6eccf95869e3890a165ad87bfa760584f4c7 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Mon, 26 Oct 2015 21:40:25 -0500 Subject: [PATCH] Rename schedulerApiAlgorithmAdapter -> mesosSchedulerAlgorithm, move to algorithm.go --- contrib/mesos/pkg/scheduler/algorithm.go | 140 +++++++++++++++++++++++ contrib/mesos/pkg/scheduler/plugin.go | 114 ------------------ contrib/mesos/pkg/scheduler/scheduler.go | 2 +- 3 files changed, 141 insertions(+), 115 deletions(-) create mode 100644 contrib/mesos/pkg/scheduler/algorithm.go diff --git a/contrib/mesos/pkg/scheduler/algorithm.go b/contrib/mesos/pkg/scheduler/algorithm.go new file mode 100644 index 00000000000..0355ed519c1 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/algorithm.go @@ -0,0 +1,140 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/offers" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" +) + +// mesosSchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface +type mesosSchedulerAlgorithm struct { + fw types.Framework + podUpdates queue.FIFO +} + +// Schedule implements the Scheduler interface of Kubernetes. +// It returns the selectedMachine's name and error (if there's any). +func (k *mesosSchedulerAlgorithm) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) { + log.Infof("Try to schedule pod %v\n", pod.Name) + ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) + + // default upstream scheduler passes pod.Name as binding.PodID + podKey, err := podtask.MakePodKey(ctx, pod.Name) + if err != nil { + return "", err + } + + k.fw.Lock() + defer k.fw.Unlock() + + switch task, state := k.fw.Tasks().ForPod(podKey); state { + case podtask.StateUnknown: + // There's a bit of a potential race here, a pod could have been yielded() and + // then before we get *here* it could be deleted. + // We use meta to index the pod in the store since that's what k8s reflector does. + podName, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + log.Warningf("aborting Schedule, unable to understand pod object %+v", pod) + return "", merrors.NoSuchPodErr + } + if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted { + // avoid scheduling a pod that's been deleted between yieldPod() and Schedule() + log.Infof("aborting Schedule, pod has been deleted %+v", pod) + return "", merrors.NoSuchPodErr + } + return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod))) + + //TODO(jdef) it's possible that the pod state has diverged from what + //we knew previously, we should probably update the task.Pod state here + //before proceeding with scheduling + case podtask.StatePending: + if pod.UID != task.Pod.UID { + // we're dealing with a brand new pod spec here, so the old one must have been + // deleted -- and so our task store is out of sync w/ respect to reality + //TODO(jdef) reconcile task + return "", fmt.Errorf("task %v spec is out of sync with pod %v spec, aborting schedule", task.ID, pod.Name) + } else if task.Has(podtask.Launched) { + // task has been marked as "launched" but the pod binding creation may have failed in k8s, + // but we're going to let someone else handle it, probably the mesos task error handler + return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID) + } else { + return k.doSchedule(task, nil) + } + + default: + return "", fmt.Errorf("task %s is not pending, nothing to schedule", task.ID) + } +} + +// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on +func (k *mesosSchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) { + var offer offers.Perishable + if task.HasAcceptedOffer() { + // verify that the offer is still on the table + offerId := task.GetOfferId() + if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() { + // skip tasks that have already have assigned offers + offer = task.Offer + } else { + task.Offer.Release() + task.Reset() + if err = k.fw.Tasks().Update(task); err != nil { + return "", err + } + } + } + if err == nil && offer == nil { + offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task) + } + if err != nil { + return "", err + } + details := offer.Details() + if details == nil { + return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) + } + slaveId := details.GetSlaveId().GetValue() + if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" { + // not much sense in Release()ing the offer here since its owner died + offer.Release() + k.fw.Offers().Invalidate(details.Id.GetValue()) + return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) + } else { + if task.Offer != nil && task.Offer != offer { + return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) + } + + task.Offer = offer + k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? + + if err := k.fw.Tasks().Update(task); err != nil { + offer.Release() + return "", err + } + return slaveHostName, nil + } +} diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index a906e90ce4d..2fe7ab8146a 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -17,12 +17,9 @@ limitations under the License. package scheduler import ( - "fmt" "time" log "github.com/golang/glog" - "k8s.io/kubernetes/contrib/mesos/pkg/offers" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" @@ -31,10 +28,8 @@ import ( types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types" "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" plugin "k8s.io/kubernetes/plugin/pkg/scheduler" - "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" ) const ( @@ -53,115 +48,6 @@ type PluginInterface interface { Run(<-chan struct{}) } -// k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface -type schedulerApiAlgorithmAdapter struct { - fw types.Framework - podUpdates queue.FIFO -} - -// Schedule implements the Scheduler interface of Kubernetes. -// It returns the selectedMachine's name and error (if there's any). -func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) { - log.Infof("Try to schedule pod %v\n", pod.Name) - ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) - - // default upstream scheduler passes pod.Name as binding.PodID - podKey, err := podtask.MakePodKey(ctx, pod.Name) - if err != nil { - return "", err - } - - k.fw.Lock() - defer k.fw.Unlock() - - switch task, state := k.fw.Tasks().ForPod(podKey); state { - case podtask.StateUnknown: - // There's a bit of a potential race here, a pod could have been yielded() and - // then before we get *here* it could be deleted. - // We use meta to index the pod in the store since that's what k8s reflector does. - podName, err := cache.MetaNamespaceKeyFunc(pod) - if err != nil { - log.Warningf("aborting Schedule, unable to understand pod object %+v", pod) - return "", merrors.NoSuchPodErr - } - if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted { - // avoid scheduling a pod that's been deleted between yieldPod() and Schedule() - log.Infof("aborting Schedule, pod has been deleted %+v", pod) - return "", merrors.NoSuchPodErr - } - return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod))) - - //TODO(jdef) it's possible that the pod state has diverged from what - //we knew previously, we should probably update the task.Pod state here - //before proceeding with scheduling - case podtask.StatePending: - if pod.UID != task.Pod.UID { - // we're dealing with a brand new pod spec here, so the old one must have been - // deleted -- and so our task store is out of sync w/ respect to reality - //TODO(jdef) reconcile task - return "", fmt.Errorf("task %v spec is out of sync with pod %v spec, aborting schedule", task.ID, pod.Name) - } else if task.Has(podtask.Launched) { - // task has been marked as "launched" but the pod binding creation may have failed in k8s, - // but we're going to let someone else handle it, probably the mesos task error handler - return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID) - } else { - return k.doSchedule(task, nil) - } - - default: - return "", fmt.Errorf("task %s is not pending, nothing to schedule", task.ID) - } -} - -// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on -func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (string, error) { - var offer offers.Perishable - if task.HasAcceptedOffer() { - // verify that the offer is still on the table - offerId := task.GetOfferId() - if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() { - // skip tasks that have already have assigned offers - offer = task.Offer - } else { - task.Offer.Release() - task.Reset() - if err = k.fw.Tasks().Update(task); err != nil { - return "", err - } - } - } - if err == nil && offer == nil { - offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task) - } - if err != nil { - return "", err - } - details := offer.Details() - if details == nil { - return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) - } - slaveId := details.GetSlaveId().GetValue() - if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" { - // not much sense in Release()ing the offer here since its owner died - offer.Release() - k.fw.Offers().Invalidate(details.Id.GetValue()) - return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) - } else { - if task.Offer != nil && task.Offer != offer { - return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) - } - - task.Offer = offer - k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? - - if err := k.fw.Tasks().Update(task); err != nil { - offer.Release() - return "", err - } - return slaveHostName, nil - } -} - type PluginConfig struct { *plugin.Config fw types.Framework diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 130f1d06204..0336a660646 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -796,7 +796,7 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se return &PluginConfig{ Config: &plugin.Config{ NodeLister: nil, - Algorithm: &schedulerApiAlgorithmAdapter{ + Algorithm: &mesosSchedulerAlgorithm{ fw: scheduler, podUpdates: podUpdates, },