mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-25 03:21:15 +00:00
Move independent concepts out of scheduler plugin in their own files
This commit is contained in:
parent
b600e6c497
commit
26338dcd4d
145
contrib/mesos/pkg/scheduler/binder.go
Normal file
145
contrib/mesos/pkg/scheduler/binder.go
Normal file
@ -0,0 +1,145 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
type binder struct {
|
||||
api schedulerInterface
|
||||
}
|
||||
|
||||
// implements binding.Registry, launches the pod-associated-task in mesos
|
||||
func (b *binder) Bind(binding *api.Binding) error {
|
||||
|
||||
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
|
||||
|
||||
// default upstream scheduler passes pod.Name as binding.Name
|
||||
podKey, err := podtask.MakePodKey(ctx, binding.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.api.Lock()
|
||||
defer b.api.Unlock()
|
||||
|
||||
switch task, state := b.api.tasks().ForPod(podKey); state {
|
||||
case podtask.StatePending:
|
||||
return b.bind(ctx, binding, task)
|
||||
default:
|
||||
// in this case it's likely that the pod has been deleted between Schedule
|
||||
// and Bind calls
|
||||
log.Infof("No pending task for pod %s", podKey)
|
||||
return noSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?!
|
||||
}
|
||||
}
|
||||
|
||||
func (b *binder) rollback(task *podtask.T, err error) error {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
if err2 := b.api.tasks().Update(task); err2 != nil {
|
||||
log.Errorf("failed to update pod task: %v", err2)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// assumes that: caller has acquired scheduler lock and that the task is still pending
|
||||
//
|
||||
// bind does not actually do the binding itself, but launches the pod as a Mesos task. The
|
||||
// kubernetes executor on the slave will finally do the binding. This is different from the
|
||||
// upstream scheduler in the sense that the upstream scheduler does the binding and the
|
||||
// kubelet will notice that and launches the pod.
|
||||
func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) {
|
||||
// sanity check: ensure that the task hasAcceptedOffer(), it's possible that between
|
||||
// Schedule() and now that the offer for this task was rescinded or invalidated.
|
||||
// ((we should never see this here))
|
||||
if !task.HasAcceptedOffer() {
|
||||
return fmt.Errorf("task has not accepted a valid offer %v", task.ID)
|
||||
}
|
||||
|
||||
// By this time, there is a chance that the slave is disconnected.
|
||||
offerId := task.GetOfferId()
|
||||
if offer, ok := b.api.offers().Get(offerId); !ok || offer.HasExpired() {
|
||||
// already rescinded or timed out or otherwise invalidated
|
||||
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
|
||||
}
|
||||
|
||||
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
|
||||
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
|
||||
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
|
||||
if err = b.api.launchTask(task); err == nil {
|
||||
b.api.offers().Invalidate(offerId)
|
||||
task.Set(podtask.Launched)
|
||||
if err = b.api.tasks().Update(task); err != nil {
|
||||
// this should only happen if the task has been removed or has changed status,
|
||||
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly
|
||||
log.Errorf("failed to update task w/ Launched status: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
return b.rollback(task, fmt.Errorf("Failed to launch task %v: %v", task.ID, err))
|
||||
}
|
||||
|
||||
//TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified
|
||||
func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error {
|
||||
pod := task.Pod
|
||||
|
||||
// we make an effort here to avoid making changes to the task's copy of the pod, since
|
||||
// we want that to reflect the initial user spec, and not the modified spec that we
|
||||
// build for the executor to consume.
|
||||
oemCt := pod.Spec.Containers
|
||||
pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod
|
||||
|
||||
if pod.Annotations == nil {
|
||||
pod.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
task.SaveRecoveryInfo(pod.Annotations)
|
||||
pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave
|
||||
|
||||
for _, entry := range task.Spec.PortMap {
|
||||
oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports
|
||||
ports := append([]api.ContainerPort{}, oemPorts...)
|
||||
p := &ports[entry.PortIdx]
|
||||
p.HostPort = int(entry.OfferPort)
|
||||
op := strconv.FormatUint(entry.OfferPort, 10)
|
||||
pod.Annotations[fmt.Sprintf(annotation.PortMappingKeyFormat, p.Protocol, p.ContainerPort)] = op
|
||||
if p.Name != "" {
|
||||
pod.Annotations[fmt.Sprintf(annotation.PortNameMappingKeyFormat, p.Protocol, p.Name)] = op
|
||||
}
|
||||
pod.Spec.Containers[entry.ContainerIdx].Ports = ports
|
||||
}
|
||||
|
||||
// the kubelet-executor uses this to instantiate the pod
|
||||
log.V(3).Infof("prepared pod spec: %+v", pod)
|
||||
|
||||
data, err := api.Codec.Encode(&pod)
|
||||
if err != nil {
|
||||
log.V(2).Infof("Failed to marshal the pod spec: %v", err)
|
||||
return err
|
||||
}
|
||||
task.Spec.Data = data
|
||||
return nil
|
||||
}
|
110
contrib/mesos/pkg/scheduler/deleter.go
Normal file
110
contrib/mesos/pkg/scheduler/deleter.go
Normal file
@ -0,0 +1,110 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
type deleter struct {
|
||||
api schedulerInterface
|
||||
qr *queuer
|
||||
}
|
||||
|
||||
// currently monitors for "pod deleted" events, upon which handle()
|
||||
// is invoked.
|
||||
func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
|
||||
go runtime.Until(func() {
|
||||
for {
|
||||
entry := <-updates
|
||||
pod := entry.Value().(*Pod)
|
||||
if entry.Is(queue.DELETE_EVENT) {
|
||||
if err := k.deleteOne(pod); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
} else if !entry.Is(queue.POP_EVENT) {
|
||||
k.qr.updatesAvailable()
|
||||
}
|
||||
}
|
||||
}, 1*time.Second, done)
|
||||
}
|
||||
|
||||
func (k *deleter) deleteOne(pod *Pod) error {
|
||||
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
|
||||
podKey, err := podtask.MakePodKey(ctx, pod.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.V(2).Infof("pod deleted: %v", podKey)
|
||||
|
||||
// order is important here: we want to make sure we have the lock before
|
||||
// removing the pod from the scheduling queue. this makes the concurrent
|
||||
// execution of scheduler-error-handling and delete-handling easier to
|
||||
// reason about.
|
||||
k.api.Lock()
|
||||
defer k.api.Unlock()
|
||||
|
||||
// prevent the scheduler from attempting to pop this; it's also possible that
|
||||
// it's concurrently being scheduled (somewhere between pod scheduling and
|
||||
// binding) - if so, then we'll end up removing it from taskRegistry which
|
||||
// will abort Bind()ing
|
||||
k.qr.dequeue(pod.GetUID())
|
||||
|
||||
switch task, state := k.api.tasks().ForPod(podKey); state {
|
||||
case podtask.StateUnknown:
|
||||
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
|
||||
return noSuchPodErr
|
||||
|
||||
// determine if the task has already been launched to mesos, if not then
|
||||
// cleanup is easier (unregister) since there's no state to sync
|
||||
case podtask.StatePending:
|
||||
if !task.Has(podtask.Launched) {
|
||||
// we've been invoked in between Schedule() and Bind()
|
||||
if task.HasAcceptedOffer() {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
task.Set(podtask.Deleted)
|
||||
//TODO(jdef) probably want better handling here
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
k.api.tasks().Unregister(task)
|
||||
return nil
|
||||
}
|
||||
fallthrough
|
||||
|
||||
case podtask.StateRunning:
|
||||
// signal to watchers that the related pod is going down
|
||||
task.Set(podtask.Deleted)
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
log.Errorf("failed to update task w/ Deleted status: %v", err)
|
||||
}
|
||||
return k.api.killTask(task.ID)
|
||||
|
||||
default:
|
||||
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)
|
||||
return noSuchTaskErr
|
||||
}
|
||||
}
|
@ -18,9 +18,7 @@ package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -45,10 +43,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
enqueuePopTimeout = 200 * time.Millisecond
|
||||
enqueueWaitTimeout = 1 * time.Second
|
||||
yieldPopTimeout = 200 * time.Millisecond
|
||||
yieldWaitTimeout = 1 * time.Second
|
||||
pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling
|
||||
)
|
||||
|
||||
@ -116,124 +110,6 @@ func (k *k8smScheduler) launchTask(task *podtask.T) error {
|
||||
return err
|
||||
}
|
||||
|
||||
type binder struct {
|
||||
api schedulerInterface
|
||||
}
|
||||
|
||||
// implements binding.Registry, launches the pod-associated-task in mesos
|
||||
func (b *binder) Bind(binding *api.Binding) error {
|
||||
|
||||
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
|
||||
|
||||
// default upstream scheduler passes pod.Name as binding.Name
|
||||
podKey, err := podtask.MakePodKey(ctx, binding.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.api.Lock()
|
||||
defer b.api.Unlock()
|
||||
|
||||
switch task, state := b.api.tasks().ForPod(podKey); state {
|
||||
case podtask.StatePending:
|
||||
return b.bind(ctx, binding, task)
|
||||
default:
|
||||
// in this case it's likely that the pod has been deleted between Schedule
|
||||
// and Bind calls
|
||||
log.Infof("No pending task for pod %s", podKey)
|
||||
return noSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?!
|
||||
}
|
||||
}
|
||||
|
||||
func (b *binder) rollback(task *podtask.T, err error) error {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
if err2 := b.api.tasks().Update(task); err2 != nil {
|
||||
log.Errorf("failed to update pod task: %v", err2)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// assumes that: caller has acquired scheduler lock and that the task is still pending
|
||||
//
|
||||
// bind does not actually do the binding itself, but launches the pod as a Mesos task. The
|
||||
// kubernetes executor on the slave will finally do the binding. This is different from the
|
||||
// upstream scheduler in the sense that the upstream scheduler does the binding and the
|
||||
// kubelet will notice that and launches the pod.
|
||||
func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) {
|
||||
// sanity check: ensure that the task hasAcceptedOffer(), it's possible that between
|
||||
// Schedule() and now that the offer for this task was rescinded or invalidated.
|
||||
// ((we should never see this here))
|
||||
if !task.HasAcceptedOffer() {
|
||||
return fmt.Errorf("task has not accepted a valid offer %v", task.ID)
|
||||
}
|
||||
|
||||
// By this time, there is a chance that the slave is disconnected.
|
||||
offerId := task.GetOfferId()
|
||||
if offer, ok := b.api.offers().Get(offerId); !ok || offer.HasExpired() {
|
||||
// already rescinded or timed out or otherwise invalidated
|
||||
return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID))
|
||||
}
|
||||
|
||||
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
|
||||
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
|
||||
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
|
||||
if err = b.api.launchTask(task); err == nil {
|
||||
b.api.offers().Invalidate(offerId)
|
||||
task.Set(podtask.Launched)
|
||||
if err = b.api.tasks().Update(task); err != nil {
|
||||
// this should only happen if the task has been removed or has changed status,
|
||||
// which SHOULD NOT HAPPEN as long as we're synchronizing correctly
|
||||
log.Errorf("failed to update task w/ Launched status: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
return b.rollback(task, fmt.Errorf("Failed to launch task %v: %v", task.ID, err))
|
||||
}
|
||||
|
||||
//TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified
|
||||
func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error {
|
||||
pod := task.Pod
|
||||
|
||||
// we make an effort here to avoid making changes to the task's copy of the pod, since
|
||||
// we want that to reflect the initial user spec, and not the modified spec that we
|
||||
// build for the executor to consume.
|
||||
oemCt := pod.Spec.Containers
|
||||
pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod
|
||||
|
||||
if pod.Annotations == nil {
|
||||
pod.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
task.SaveRecoveryInfo(pod.Annotations)
|
||||
pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave
|
||||
|
||||
for _, entry := range task.Spec.PortMap {
|
||||
oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports
|
||||
ports := append([]api.ContainerPort{}, oemPorts...)
|
||||
p := &ports[entry.PortIdx]
|
||||
p.HostPort = int(entry.OfferPort)
|
||||
op := strconv.FormatUint(entry.OfferPort, 10)
|
||||
pod.Annotations[fmt.Sprintf(annotation.PortMappingKeyFormat, p.Protocol, p.ContainerPort)] = op
|
||||
if p.Name != "" {
|
||||
pod.Annotations[fmt.Sprintf(annotation.PortNameMappingKeyFormat, p.Protocol, p.Name)] = op
|
||||
}
|
||||
pod.Spec.Containers[entry.ContainerIdx].Ports = ports
|
||||
}
|
||||
|
||||
// the kubelet-executor uses this to instantiate the pod
|
||||
log.V(3).Infof("prepared pod spec: %+v", pod)
|
||||
|
||||
data, err := api.Codec.Encode(&pod)
|
||||
if err != nil {
|
||||
log.V(2).Infof("Failed to marshal the pod spec: %v", err)
|
||||
return err
|
||||
}
|
||||
task.Spec.Data = data
|
||||
return nil
|
||||
}
|
||||
|
||||
type kubeScheduler struct {
|
||||
api schedulerInterface
|
||||
podUpdates queue.FIFO
|
||||
@ -351,155 +227,6 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
type queuer struct {
|
||||
lock sync.Mutex // shared by condition variables of this struct
|
||||
podUpdates queue.FIFO // queue of pod updates to be processed
|
||||
podQueue *queue.DelayFIFO // queue of pods to be scheduled
|
||||
deltaCond sync.Cond // pod changes are available for processing
|
||||
unscheduledCond sync.Cond // there are unscheduled pods for processing
|
||||
}
|
||||
|
||||
func newQueuer(store queue.FIFO) *queuer {
|
||||
q := &queuer{
|
||||
podQueue: queue.NewDelayFIFO(),
|
||||
podUpdates: store,
|
||||
}
|
||||
q.deltaCond.L = &q.lock
|
||||
q.unscheduledCond.L = &q.lock
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *queuer) installDebugHandlers(mux *http.ServeMux) {
|
||||
mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) {
|
||||
for _, x := range q.podQueue.List() {
|
||||
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
mux.HandleFunc("/debug/scheduler/podstore", func(w http.ResponseWriter, r *http.Request) {
|
||||
for _, x := range q.podUpdates.List() {
|
||||
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// signal that there are probably pod updates waiting to be processed
|
||||
func (q *queuer) updatesAvailable() {
|
||||
q.deltaCond.Broadcast()
|
||||
}
|
||||
|
||||
// delete a pod from the to-be-scheduled queue
|
||||
func (q *queuer) dequeue(id string) {
|
||||
q.podQueue.Delete(id)
|
||||
}
|
||||
|
||||
// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that
|
||||
// may have already changed).
|
||||
func (q *queuer) requeue(pod *Pod) {
|
||||
// use KeepExisting in case the pod has already been updated (can happen if binding fails
|
||||
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
|
||||
q.podQueue.Add(pod, queue.KeepExisting)
|
||||
q.unscheduledCond.Broadcast()
|
||||
}
|
||||
|
||||
// same as requeue but calls podQueue.Offer instead of podQueue.Add
|
||||
func (q *queuer) reoffer(pod *Pod) {
|
||||
// use KeepExisting in case the pod has already been updated (can happen if binding fails
|
||||
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
|
||||
if q.podQueue.Offer(pod, queue.KeepExisting) {
|
||||
q.unscheduledCond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
// spawns a go-routine to watch for unscheduled pods and queue them up
|
||||
// for scheduling. returns immediately.
|
||||
func (q *queuer) Run(done <-chan struct{}) {
|
||||
go runtime.Until(func() {
|
||||
log.Info("Watching for newly created pods")
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
for {
|
||||
// limit blocking here for short intervals so that scheduling
|
||||
// may proceed even if there have been no recent pod changes
|
||||
p := q.podUpdates.Await(enqueuePopTimeout)
|
||||
if p == nil {
|
||||
signalled := runtime.After(q.deltaCond.Wait)
|
||||
// we've yielded the lock
|
||||
select {
|
||||
case <-time.After(enqueueWaitTimeout):
|
||||
q.deltaCond.Broadcast() // abort Wait()
|
||||
<-signalled // wait for lock re-acquisition
|
||||
log.V(4).Infoln("timed out waiting for a pod update")
|
||||
case <-signalled:
|
||||
// we've acquired the lock and there may be
|
||||
// changes for us to process now
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pod := p.(*Pod)
|
||||
if recoverAssignedSlave(pod.Pod) != "" {
|
||||
log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name)
|
||||
q.dequeue(pod.GetUID())
|
||||
} else {
|
||||
// use ReplaceExisting because we are always pushing the latest state
|
||||
now := time.Now()
|
||||
pod.deadline = &now
|
||||
if q.podQueue.Offer(pod, queue.ReplaceExisting) {
|
||||
q.unscheduledCond.Broadcast()
|
||||
log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name)
|
||||
} else {
|
||||
log.Warningf("failed to queue pod for scheduling: %v", pod.Pod.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1*time.Second, done)
|
||||
}
|
||||
|
||||
// implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler
|
||||
func (q *queuer) yield() *api.Pod {
|
||||
log.V(2).Info("attempting to yield a pod")
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
for {
|
||||
// limit blocking here to short intervals so that we don't block the
|
||||
// enqueuer Run() routine for very long
|
||||
kpod := q.podQueue.Await(yieldPopTimeout)
|
||||
if kpod == nil {
|
||||
signalled := runtime.After(q.unscheduledCond.Wait)
|
||||
// lock is yielded at this point and we're going to wait for either
|
||||
// a timeout, or a signal that there's data
|
||||
select {
|
||||
case <-time.After(yieldWaitTimeout):
|
||||
q.unscheduledCond.Broadcast() // abort Wait()
|
||||
<-signalled // wait for the go-routine, and the lock
|
||||
log.V(4).Infoln("timed out waiting for a pod to yield")
|
||||
case <-signalled:
|
||||
// we have acquired the lock, and there
|
||||
// may be a pod for us to pop now
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pod := kpod.(*Pod).Pod
|
||||
if podName, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err)
|
||||
} else if !q.podUpdates.Poll(podName, queue.POP_EVENT) {
|
||||
log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod)
|
||||
} else if recoverAssignedSlave(pod) != "" {
|
||||
// should never happen if enqueuePods is filtering properly
|
||||
log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod)
|
||||
} else {
|
||||
return pod
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type errorHandler struct {
|
||||
api schedulerInterface
|
||||
backoff *backoff.Backoff
|
||||
@ -568,89 +295,6 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
|
||||
}
|
||||
}
|
||||
|
||||
type deleter struct {
|
||||
api schedulerInterface
|
||||
qr *queuer
|
||||
}
|
||||
|
||||
// currently monitors for "pod deleted" events, upon which handle()
|
||||
// is invoked.
|
||||
func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) {
|
||||
go runtime.Until(func() {
|
||||
for {
|
||||
entry := <-updates
|
||||
pod := entry.Value().(*Pod)
|
||||
if entry.Is(queue.DELETE_EVENT) {
|
||||
if err := k.deleteOne(pod); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
} else if !entry.Is(queue.POP_EVENT) {
|
||||
k.qr.updatesAvailable()
|
||||
}
|
||||
}
|
||||
}, 1*time.Second, done)
|
||||
}
|
||||
|
||||
func (k *deleter) deleteOne(pod *Pod) error {
|
||||
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
|
||||
podKey, err := podtask.MakePodKey(ctx, pod.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.V(2).Infof("pod deleted: %v", podKey)
|
||||
|
||||
// order is important here: we want to make sure we have the lock before
|
||||
// removing the pod from the scheduling queue. this makes the concurrent
|
||||
// execution of scheduler-error-handling and delete-handling easier to
|
||||
// reason about.
|
||||
k.api.Lock()
|
||||
defer k.api.Unlock()
|
||||
|
||||
// prevent the scheduler from attempting to pop this; it's also possible that
|
||||
// it's concurrently being scheduled (somewhere between pod scheduling and
|
||||
// binding) - if so, then we'll end up removing it from taskRegistry which
|
||||
// will abort Bind()ing
|
||||
k.qr.dequeue(pod.GetUID())
|
||||
|
||||
switch task, state := k.api.tasks().ForPod(podKey); state {
|
||||
case podtask.StateUnknown:
|
||||
log.V(2).Infof("Could not resolve pod '%s' to task id", podKey)
|
||||
return noSuchPodErr
|
||||
|
||||
// determine if the task has already been launched to mesos, if not then
|
||||
// cleanup is easier (unregister) since there's no state to sync
|
||||
case podtask.StatePending:
|
||||
if !task.Has(podtask.Launched) {
|
||||
// we've been invoked in between Schedule() and Bind()
|
||||
if task.HasAcceptedOffer() {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
task.Set(podtask.Deleted)
|
||||
//TODO(jdef) probably want better handling here
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
k.api.tasks().Unregister(task)
|
||||
return nil
|
||||
}
|
||||
fallthrough
|
||||
|
||||
case podtask.StateRunning:
|
||||
// signal to watchers that the related pod is going down
|
||||
task.Set(podtask.Deleted)
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
log.Errorf("failed to update task w/ Deleted status: %v", err)
|
||||
}
|
||||
return k.api.killTask(task.ID)
|
||||
|
||||
default:
|
||||
log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID)
|
||||
return noSuchTaskErr
|
||||
}
|
||||
}
|
||||
|
||||
// Create creates a scheduler plugin and all supporting background functions.
|
||||
func (k *KubernetesMesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig {
|
||||
// use ListWatch watching pods using the client by default
|
||||
|
187
contrib/mesos/pkg/scheduler/queuer.go
Normal file
187
contrib/mesos/pkg/scheduler/queuer.go
Normal file
@ -0,0 +1,187 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
)
|
||||
|
||||
const (
|
||||
enqueuePopTimeout = 200 * time.Millisecond
|
||||
enqueueWaitTimeout = 1 * time.Second
|
||||
yieldPopTimeout = 200 * time.Millisecond
|
||||
yieldWaitTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
type queuer struct {
|
||||
lock sync.Mutex // shared by condition variables of this struct
|
||||
podUpdates queue.FIFO // queue of pod updates to be processed
|
||||
podQueue *queue.DelayFIFO // queue of pods to be scheduled
|
||||
deltaCond sync.Cond // pod changes are available for processing
|
||||
unscheduledCond sync.Cond // there are unscheduled pods for processing
|
||||
}
|
||||
|
||||
func newQueuer(store queue.FIFO) *queuer {
|
||||
q := &queuer{
|
||||
podQueue: queue.NewDelayFIFO(),
|
||||
podUpdates: store,
|
||||
}
|
||||
q.deltaCond.L = &q.lock
|
||||
q.unscheduledCond.L = &q.lock
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *queuer) installDebugHandlers(mux *http.ServeMux) {
|
||||
mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) {
|
||||
for _, x := range q.podQueue.List() {
|
||||
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
mux.HandleFunc("/debug/scheduler/podstore", func(w http.ResponseWriter, r *http.Request) {
|
||||
for _, x := range q.podUpdates.List() {
|
||||
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// signal that there are probably pod updates waiting to be processed
|
||||
func (q *queuer) updatesAvailable() {
|
||||
q.deltaCond.Broadcast()
|
||||
}
|
||||
|
||||
// delete a pod from the to-be-scheduled queue
|
||||
func (q *queuer) dequeue(id string) {
|
||||
q.podQueue.Delete(id)
|
||||
}
|
||||
|
||||
// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that
|
||||
// may have already changed).
|
||||
func (q *queuer) requeue(pod *Pod) {
|
||||
// use KeepExisting in case the pod has already been updated (can happen if binding fails
|
||||
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
|
||||
q.podQueue.Add(pod, queue.KeepExisting)
|
||||
q.unscheduledCond.Broadcast()
|
||||
}
|
||||
|
||||
// same as requeue but calls podQueue.Offer instead of podQueue.Add
|
||||
func (q *queuer) reoffer(pod *Pod) {
|
||||
// use KeepExisting in case the pod has already been updated (can happen if binding fails
|
||||
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
|
||||
if q.podQueue.Offer(pod, queue.KeepExisting) {
|
||||
q.unscheduledCond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
// spawns a go-routine to watch for unscheduled pods and queue them up
|
||||
// for scheduling. returns immediately.
|
||||
func (q *queuer) Run(done <-chan struct{}) {
|
||||
go runtime.Until(func() {
|
||||
log.Info("Watching for newly created pods")
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
for {
|
||||
// limit blocking here for short intervals so that scheduling
|
||||
// may proceed even if there have been no recent pod changes
|
||||
p := q.podUpdates.Await(enqueuePopTimeout)
|
||||
if p == nil {
|
||||
signalled := runtime.After(q.deltaCond.Wait)
|
||||
// we've yielded the lock
|
||||
select {
|
||||
case <-time.After(enqueueWaitTimeout):
|
||||
q.deltaCond.Broadcast() // abort Wait()
|
||||
<-signalled // wait for lock re-acquisition
|
||||
log.V(4).Infoln("timed out waiting for a pod update")
|
||||
case <-signalled:
|
||||
// we've acquired the lock and there may be
|
||||
// changes for us to process now
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pod := p.(*Pod)
|
||||
if recoverAssignedSlave(pod.Pod) != "" {
|
||||
log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name)
|
||||
q.dequeue(pod.GetUID())
|
||||
} else {
|
||||
// use ReplaceExisting because we are always pushing the latest state
|
||||
now := time.Now()
|
||||
pod.deadline = &now
|
||||
if q.podQueue.Offer(pod, queue.ReplaceExisting) {
|
||||
q.unscheduledCond.Broadcast()
|
||||
log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name)
|
||||
} else {
|
||||
log.Warningf("failed to queue pod for scheduling: %v", pod.Pod.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1*time.Second, done)
|
||||
}
|
||||
|
||||
// implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler
|
||||
func (q *queuer) yield() *api.Pod {
|
||||
log.V(2).Info("attempting to yield a pod")
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
for {
|
||||
// limit blocking here to short intervals so that we don't block the
|
||||
// enqueuer Run() routine for very long
|
||||
kpod := q.podQueue.Await(yieldPopTimeout)
|
||||
if kpod == nil {
|
||||
signalled := runtime.After(q.unscheduledCond.Wait)
|
||||
// lock is yielded at this point and we're going to wait for either
|
||||
// a timeout, or a signal that there's data
|
||||
select {
|
||||
case <-time.After(yieldWaitTimeout):
|
||||
q.unscheduledCond.Broadcast() // abort Wait()
|
||||
<-signalled // wait for the go-routine, and the lock
|
||||
log.V(4).Infoln("timed out waiting for a pod to yield")
|
||||
case <-signalled:
|
||||
// we have acquired the lock, and there
|
||||
// may be a pod for us to pop now
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pod := kpod.(*Pod).Pod
|
||||
if podName, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err)
|
||||
} else if !q.podUpdates.Poll(podName, queue.POP_EVENT) {
|
||||
log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod)
|
||||
} else if recoverAssignedSlave(pod) != "" {
|
||||
// should never happen if enqueuePods is filtering properly
|
||||
log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod)
|
||||
} else {
|
||||
return pod
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user