mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
Rename schedulerApiAlgorithmAdapter -> mesosSchedulerAlgorithm, move to algorithm.go
This commit is contained in:
parent
18fbc1fe57
commit
e5ce6eccf9
140
contrib/mesos/pkg/scheduler/algorithm.go
Normal file
140
contrib/mesos/pkg/scheduler/algorithm.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
log "github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||||
|
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||||
|
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mesosSchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
|
||||||
|
type mesosSchedulerAlgorithm struct {
|
||||||
|
fw types.Framework
|
||||||
|
podUpdates queue.FIFO
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schedule implements the Scheduler interface of Kubernetes.
|
||||||
|
// It returns the selectedMachine's name and error (if there's any).
|
||||||
|
func (k *mesosSchedulerAlgorithm) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) {
|
||||||
|
log.Infof("Try to schedule pod %v\n", pod.Name)
|
||||||
|
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
|
||||||
|
|
||||||
|
// default upstream scheduler passes pod.Name as binding.PodID
|
||||||
|
podKey, err := podtask.MakePodKey(ctx, pod.Name)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
k.fw.Lock()
|
||||||
|
defer k.fw.Unlock()
|
||||||
|
|
||||||
|
switch task, state := k.fw.Tasks().ForPod(podKey); state {
|
||||||
|
case podtask.StateUnknown:
|
||||||
|
// There's a bit of a potential race here, a pod could have been yielded() and
|
||||||
|
// then before we get *here* it could be deleted.
|
||||||
|
// We use meta to index the pod in the store since that's what k8s reflector does.
|
||||||
|
podName, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("aborting Schedule, unable to understand pod object %+v", pod)
|
||||||
|
return "", merrors.NoSuchPodErr
|
||||||
|
}
|
||||||
|
if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted {
|
||||||
|
// avoid scheduling a pod that's been deleted between yieldPod() and Schedule()
|
||||||
|
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
||||||
|
return "", merrors.NoSuchPodErr
|
||||||
|
}
|
||||||
|
return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod)))
|
||||||
|
|
||||||
|
//TODO(jdef) it's possible that the pod state has diverged from what
|
||||||
|
//we knew previously, we should probably update the task.Pod state here
|
||||||
|
//before proceeding with scheduling
|
||||||
|
case podtask.StatePending:
|
||||||
|
if pod.UID != task.Pod.UID {
|
||||||
|
// we're dealing with a brand new pod spec here, so the old one must have been
|
||||||
|
// deleted -- and so our task store is out of sync w/ respect to reality
|
||||||
|
//TODO(jdef) reconcile task
|
||||||
|
return "", fmt.Errorf("task %v spec is out of sync with pod %v spec, aborting schedule", task.ID, pod.Name)
|
||||||
|
} else if task.Has(podtask.Launched) {
|
||||||
|
// task has been marked as "launched" but the pod binding creation may have failed in k8s,
|
||||||
|
// but we're going to let someone else handle it, probably the mesos task error handler
|
||||||
|
return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
|
||||||
|
} else {
|
||||||
|
return k.doSchedule(task, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("task %s is not pending, nothing to schedule", task.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
|
||||||
|
func (k *mesosSchedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) {
|
||||||
|
var offer offers.Perishable
|
||||||
|
if task.HasAcceptedOffer() {
|
||||||
|
// verify that the offer is still on the table
|
||||||
|
offerId := task.GetOfferId()
|
||||||
|
if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() {
|
||||||
|
// skip tasks that have already have assigned offers
|
||||||
|
offer = task.Offer
|
||||||
|
} else {
|
||||||
|
task.Offer.Release()
|
||||||
|
task.Reset()
|
||||||
|
if err = k.fw.Tasks().Update(task); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err == nil && offer == nil {
|
||||||
|
offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
details := offer.Details()
|
||||||
|
if details == nil {
|
||||||
|
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
||||||
|
}
|
||||||
|
slaveId := details.GetSlaveId().GetValue()
|
||||||
|
if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" {
|
||||||
|
// not much sense in Release()ing the offer here since its owner died
|
||||||
|
offer.Release()
|
||||||
|
k.fw.Offers().Invalidate(details.Id.GetValue())
|
||||||
|
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
||||||
|
} else {
|
||||||
|
if task.Offer != nil && task.Offer != offer {
|
||||||
|
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
|
||||||
|
}
|
||||||
|
|
||||||
|
task.Offer = offer
|
||||||
|
k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
||||||
|
|
||||||
|
if err := k.fw.Tasks().Update(task); err != nil {
|
||||||
|
offer.Release()
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return slaveHostName, nil
|
||||||
|
}
|
||||||
|
}
|
@ -17,12 +17,9 @@ limitations under the License.
|
|||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/golang/glog"
|
log "github.com/golang/glog"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations"
|
||||||
@ -31,10 +28,8 @@ import (
|
|||||||
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
|
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/types"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/client/cache"
|
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
plugin "k8s.io/kubernetes/plugin/pkg/scheduler"
|
plugin "k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -53,115 +48,6 @@ type PluginInterface interface {
|
|||||||
Run(<-chan struct{})
|
Run(<-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// k8smSchedulingAlgorithm implements the algorithm.ScheduleAlgorithm interface
|
|
||||||
type schedulerApiAlgorithmAdapter struct {
|
|
||||||
fw types.Framework
|
|
||||||
podUpdates queue.FIFO
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedule implements the Scheduler interface of Kubernetes.
|
|
||||||
// It returns the selectedMachine's name and error (if there's any).
|
|
||||||
func (k *schedulerApiAlgorithmAdapter) Schedule(pod *api.Pod, unused algorithm.NodeLister) (string, error) {
|
|
||||||
log.Infof("Try to schedule pod %v\n", pod.Name)
|
|
||||||
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
|
|
||||||
|
|
||||||
// default upstream scheduler passes pod.Name as binding.PodID
|
|
||||||
podKey, err := podtask.MakePodKey(ctx, pod.Name)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
k.fw.Lock()
|
|
||||||
defer k.fw.Unlock()
|
|
||||||
|
|
||||||
switch task, state := k.fw.Tasks().ForPod(podKey); state {
|
|
||||||
case podtask.StateUnknown:
|
|
||||||
// There's a bit of a potential race here, a pod could have been yielded() and
|
|
||||||
// then before we get *here* it could be deleted.
|
|
||||||
// We use meta to index the pod in the store since that's what k8s reflector does.
|
|
||||||
podName, err := cache.MetaNamespaceKeyFunc(pod)
|
|
||||||
if err != nil {
|
|
||||||
log.Warningf("aborting Schedule, unable to understand pod object %+v", pod)
|
|
||||||
return "", merrors.NoSuchPodErr
|
|
||||||
}
|
|
||||||
if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted {
|
|
||||||
// avoid scheduling a pod that's been deleted between yieldPod() and Schedule()
|
|
||||||
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
|
||||||
return "", merrors.NoSuchPodErr
|
|
||||||
}
|
|
||||||
return k.doSchedule(k.fw.Tasks().Register(k.fw.CreatePodTask(ctx, pod)))
|
|
||||||
|
|
||||||
//TODO(jdef) it's possible that the pod state has diverged from what
|
|
||||||
//we knew previously, we should probably update the task.Pod state here
|
|
||||||
//before proceeding with scheduling
|
|
||||||
case podtask.StatePending:
|
|
||||||
if pod.UID != task.Pod.UID {
|
|
||||||
// we're dealing with a brand new pod spec here, so the old one must have been
|
|
||||||
// deleted -- and so our task store is out of sync w/ respect to reality
|
|
||||||
//TODO(jdef) reconcile task
|
|
||||||
return "", fmt.Errorf("task %v spec is out of sync with pod %v spec, aborting schedule", task.ID, pod.Name)
|
|
||||||
} else if task.Has(podtask.Launched) {
|
|
||||||
// task has been marked as "launched" but the pod binding creation may have failed in k8s,
|
|
||||||
// but we're going to let someone else handle it, probably the mesos task error handler
|
|
||||||
return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
|
|
||||||
} else {
|
|
||||||
return k.doSchedule(task, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
return "", fmt.Errorf("task %s is not pending, nothing to schedule", task.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
|
|
||||||
func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (string, error) {
|
|
||||||
var offer offers.Perishable
|
|
||||||
if task.HasAcceptedOffer() {
|
|
||||||
// verify that the offer is still on the table
|
|
||||||
offerId := task.GetOfferId()
|
|
||||||
if offer, ok := k.fw.Offers().Get(offerId); ok && !offer.HasExpired() {
|
|
||||||
// skip tasks that have already have assigned offers
|
|
||||||
offer = task.Offer
|
|
||||||
} else {
|
|
||||||
task.Offer.Release()
|
|
||||||
task.Reset()
|
|
||||||
if err = k.fw.Tasks().Update(task); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err == nil && offer == nil {
|
|
||||||
offer, err = k.fw.PodScheduler().SchedulePod(k.fw.Offers(), k.fw, task)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
details := offer.Details()
|
|
||||||
if details == nil {
|
|
||||||
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
|
||||||
}
|
|
||||||
slaveId := details.GetSlaveId().GetValue()
|
|
||||||
if slaveHostName := k.fw.SlaveHostNameFor(slaveId); slaveHostName == "" {
|
|
||||||
// not much sense in Release()ing the offer here since its owner died
|
|
||||||
offer.Release()
|
|
||||||
k.fw.Offers().Invalidate(details.Id.GetValue())
|
|
||||||
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
|
|
||||||
} else {
|
|
||||||
if task.Offer != nil && task.Offer != offer {
|
|
||||||
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
|
|
||||||
}
|
|
||||||
|
|
||||||
task.Offer = offer
|
|
||||||
k.fw.PodScheduler().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
|
||||||
|
|
||||||
if err := k.fw.Tasks().Update(task); err != nil {
|
|
||||||
offer.Release()
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return slaveHostName, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type PluginConfig struct {
|
type PluginConfig struct {
|
||||||
*plugin.Config
|
*plugin.Config
|
||||||
fw types.Framework
|
fw types.Framework
|
||||||
|
@ -796,7 +796,7 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se
|
|||||||
return &PluginConfig{
|
return &PluginConfig{
|
||||||
Config: &plugin.Config{
|
Config: &plugin.Config{
|
||||||
NodeLister: nil,
|
NodeLister: nil,
|
||||||
Algorithm: &schedulerApiAlgorithmAdapter{
|
Algorithm: &mesosSchedulerAlgorithm{
|
||||||
fw: scheduler,
|
fw: scheduler,
|
||||||
podUpdates: podUpdates,
|
podUpdates: podUpdates,
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user