From 26338dcd4d7c2b5f7d60b1cca905c252638bcf28 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 25 Oct 2015 10:56:40 -0700 Subject: [PATCH] Move independent concepts out of scheduler plugin in their own files --- contrib/mesos/pkg/scheduler/binder.go | 145 ++++++++++ contrib/mesos/pkg/scheduler/deleter.go | 110 ++++++++ contrib/mesos/pkg/scheduler/plugin.go | 356 ------------------------- contrib/mesos/pkg/scheduler/queuer.go | 187 +++++++++++++ 4 files changed, 442 insertions(+), 356 deletions(-) create mode 100644 contrib/mesos/pkg/scheduler/binder.go create mode 100644 contrib/mesos/pkg/scheduler/deleter.go create mode 100644 contrib/mesos/pkg/scheduler/queuer.go diff --git a/contrib/mesos/pkg/scheduler/binder.go b/contrib/mesos/pkg/scheduler/binder.go new file mode 100644 index 00000000000..7ec4d30fb23 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/binder.go @@ -0,0 +1,145 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "strconv" + + log "github.com/golang/glog" + annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/pkg/api" +) + +type binder struct { + api schedulerInterface +} + +// implements binding.Registry, launches the pod-associated-task in mesos +func (b *binder) Bind(binding *api.Binding) error { + + ctx := api.WithNamespace(api.NewContext(), binding.Namespace) + + // default upstream scheduler passes pod.Name as binding.Name + podKey, err := podtask.MakePodKey(ctx, binding.Name) + if err != nil { + return err + } + + b.api.Lock() + defer b.api.Unlock() + + switch task, state := b.api.tasks().ForPod(podKey); state { + case podtask.StatePending: + return b.bind(ctx, binding, task) + default: + // in this case it's likely that the pod has been deleted between Schedule + // and Bind calls + log.Infof("No pending task for pod %s", podKey) + return noSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?! + } +} + +func (b *binder) rollback(task *podtask.T, err error) error { + task.Offer.Release() + task.Reset() + if err2 := b.api.tasks().Update(task); err2 != nil { + log.Errorf("failed to update pod task: %v", err2) + } + return err +} + +// assumes that: caller has acquired scheduler lock and that the task is still pending +// +// bind does not actually do the binding itself, but launches the pod as a Mesos task. The +// kubernetes executor on the slave will finally do the binding. This is different from the +// upstream scheduler in the sense that the upstream scheduler does the binding and the +// kubelet will notice that and launches the pod. +func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { + // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between + // Schedule() and now that the offer for this task was rescinded or invalidated. + // ((we should never see this here)) + if !task.HasAcceptedOffer() { + return fmt.Errorf("task has not accepted a valid offer %v", task.ID) + } + + // By this time, there is a chance that the slave is disconnected. + offerId := task.GetOfferId() + if offer, ok := b.api.offers().Get(offerId); !ok || offer.HasExpired() { + // already rescinded or timed out or otherwise invalidated + return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID)) + } + + if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { + log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", + task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) + if err = b.api.launchTask(task); err == nil { + b.api.offers().Invalidate(offerId) + task.Set(podtask.Launched) + if err = b.api.tasks().Update(task); err != nil { + // this should only happen if the task has been removed or has changed status, + // which SHOULD NOT HAPPEN as long as we're synchronizing correctly + log.Errorf("failed to update task w/ Launched status: %v", err) + } + return + } + } + return b.rollback(task, fmt.Errorf("Failed to launch task %v: %v", task.ID, err)) +} + +//TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified +func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { + pod := task.Pod + + // we make an effort here to avoid making changes to the task's copy of the pod, since + // we want that to reflect the initial user spec, and not the modified spec that we + // build for the executor to consume. + oemCt := pod.Spec.Containers + pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + + task.SaveRecoveryInfo(pod.Annotations) + pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave + + for _, entry := range task.Spec.PortMap { + oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports + ports := append([]api.ContainerPort{}, oemPorts...) + p := &ports[entry.PortIdx] + p.HostPort = int(entry.OfferPort) + op := strconv.FormatUint(entry.OfferPort, 10) + pod.Annotations[fmt.Sprintf(annotation.PortMappingKeyFormat, p.Protocol, p.ContainerPort)] = op + if p.Name != "" { + pod.Annotations[fmt.Sprintf(annotation.PortNameMappingKeyFormat, p.Protocol, p.Name)] = op + } + pod.Spec.Containers[entry.ContainerIdx].Ports = ports + } + + // the kubelet-executor uses this to instantiate the pod + log.V(3).Infof("prepared pod spec: %+v", pod) + + data, err := api.Codec.Encode(&pod) + if err != nil { + log.V(2).Infof("Failed to marshal the pod spec: %v", err) + return err + } + task.Spec.Data = data + return nil +} \ No newline at end of file diff --git a/contrib/mesos/pkg/scheduler/deleter.go b/contrib/mesos/pkg/scheduler/deleter.go new file mode 100644 index 00000000000..d9b59992892 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/deleter.go @@ -0,0 +1,110 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "time" + + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/pkg/api" +) + +type deleter struct { + api schedulerInterface + qr *queuer +} + +// currently monitors for "pod deleted" events, upon which handle() +// is invoked. +func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { + go runtime.Until(func() { + for { + entry := <-updates + pod := entry.Value().(*Pod) + if entry.Is(queue.DELETE_EVENT) { + if err := k.deleteOne(pod); err != nil { + log.Error(err) + } + } else if !entry.Is(queue.POP_EVENT) { + k.qr.updatesAvailable() + } + } + }, 1*time.Second, done) +} + +func (k *deleter) deleteOne(pod *Pod) error { + ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) + podKey, err := podtask.MakePodKey(ctx, pod.Name) + if err != nil { + return err + } + + log.V(2).Infof("pod deleted: %v", podKey) + + // order is important here: we want to make sure we have the lock before + // removing the pod from the scheduling queue. this makes the concurrent + // execution of scheduler-error-handling and delete-handling easier to + // reason about. + k.api.Lock() + defer k.api.Unlock() + + // prevent the scheduler from attempting to pop this; it's also possible that + // it's concurrently being scheduled (somewhere between pod scheduling and + // binding) - if so, then we'll end up removing it from taskRegistry which + // will abort Bind()ing + k.qr.dequeue(pod.GetUID()) + + switch task, state := k.api.tasks().ForPod(podKey); state { + case podtask.StateUnknown: + log.V(2).Infof("Could not resolve pod '%s' to task id", podKey) + return noSuchPodErr + + // determine if the task has already been launched to mesos, if not then + // cleanup is easier (unregister) since there's no state to sync + case podtask.StatePending: + if !task.Has(podtask.Launched) { + // we've been invoked in between Schedule() and Bind() + if task.HasAcceptedOffer() { + task.Offer.Release() + task.Reset() + task.Set(podtask.Deleted) + //TODO(jdef) probably want better handling here + if err := k.api.tasks().Update(task); err != nil { + return err + } + } + k.api.tasks().Unregister(task) + return nil + } + fallthrough + + case podtask.StateRunning: + // signal to watchers that the related pod is going down + task.Set(podtask.Deleted) + if err := k.api.tasks().Update(task); err != nil { + log.Errorf("failed to update task w/ Deleted status: %v", err) + } + return k.api.killTask(task.ID) + + default: + log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID) + return noSuchTaskErr + } +} diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index ed219e2645c..b227912590b 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -18,9 +18,7 @@ package scheduler import ( "fmt" - "io" "net/http" - "strconv" "sync" "time" @@ -45,10 +43,6 @@ import ( ) const ( - enqueuePopTimeout = 200 * time.Millisecond - enqueueWaitTimeout = 1 * time.Second - yieldPopTimeout = 200 * time.Millisecond - yieldWaitTimeout = 1 * time.Second pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling ) @@ -116,124 +110,6 @@ func (k *k8smScheduler) launchTask(task *podtask.T) error { return err } -type binder struct { - api schedulerInterface -} - -// implements binding.Registry, launches the pod-associated-task in mesos -func (b *binder) Bind(binding *api.Binding) error { - - ctx := api.WithNamespace(api.NewContext(), binding.Namespace) - - // default upstream scheduler passes pod.Name as binding.Name - podKey, err := podtask.MakePodKey(ctx, binding.Name) - if err != nil { - return err - } - - b.api.Lock() - defer b.api.Unlock() - - switch task, state := b.api.tasks().ForPod(podKey); state { - case podtask.StatePending: - return b.bind(ctx, binding, task) - default: - // in this case it's likely that the pod has been deleted between Schedule - // and Bind calls - log.Infof("No pending task for pod %s", podKey) - return noSuchPodErr //TODO(jdef) this error is somewhat misleading since the task could be running?! - } -} - -func (b *binder) rollback(task *podtask.T, err error) error { - task.Offer.Release() - task.Reset() - if err2 := b.api.tasks().Update(task); err2 != nil { - log.Errorf("failed to update pod task: %v", err2) - } - return err -} - -// assumes that: caller has acquired scheduler lock and that the task is still pending -// -// bind does not actually do the binding itself, but launches the pod as a Mesos task. The -// kubernetes executor on the slave will finally do the binding. This is different from the -// upstream scheduler in the sense that the upstream scheduler does the binding and the -// kubelet will notice that and launches the pod. -func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) { - // sanity check: ensure that the task hasAcceptedOffer(), it's possible that between - // Schedule() and now that the offer for this task was rescinded or invalidated. - // ((we should never see this here)) - if !task.HasAcceptedOffer() { - return fmt.Errorf("task has not accepted a valid offer %v", task.ID) - } - - // By this time, there is a chance that the slave is disconnected. - offerId := task.GetOfferId() - if offer, ok := b.api.offers().Get(offerId); !ok || offer.HasExpired() { - // already rescinded or timed out or otherwise invalidated - return b.rollback(task, fmt.Errorf("failed prior to launchTask due to expired offer for task %v", task.ID)) - } - - if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { - log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", - task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) - if err = b.api.launchTask(task); err == nil { - b.api.offers().Invalidate(offerId) - task.Set(podtask.Launched) - if err = b.api.tasks().Update(task); err != nil { - // this should only happen if the task has been removed or has changed status, - // which SHOULD NOT HAPPEN as long as we're synchronizing correctly - log.Errorf("failed to update task w/ Launched status: %v", err) - } - return - } - } - return b.rollback(task, fmt.Errorf("Failed to launch task %v: %v", task.ID, err)) -} - -//TODO(jdef) unit test this, ensure that task's copy of api.Pod is not modified -func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *podtask.T, offerId string) error { - pod := task.Pod - - // we make an effort here to avoid making changes to the task's copy of the pod, since - // we want that to reflect the initial user spec, and not the modified spec that we - // build for the executor to consume. - oemCt := pod.Spec.Containers - pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod - - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } - - task.SaveRecoveryInfo(pod.Annotations) - pod.Annotations[annotation.BindingHostKey] = task.Spec.AssignedSlave - - for _, entry := range task.Spec.PortMap { - oemPorts := pod.Spec.Containers[entry.ContainerIdx].Ports - ports := append([]api.ContainerPort{}, oemPorts...) - p := &ports[entry.PortIdx] - p.HostPort = int(entry.OfferPort) - op := strconv.FormatUint(entry.OfferPort, 10) - pod.Annotations[fmt.Sprintf(annotation.PortMappingKeyFormat, p.Protocol, p.ContainerPort)] = op - if p.Name != "" { - pod.Annotations[fmt.Sprintf(annotation.PortNameMappingKeyFormat, p.Protocol, p.Name)] = op - } - pod.Spec.Containers[entry.ContainerIdx].Ports = ports - } - - // the kubelet-executor uses this to instantiate the pod - log.V(3).Infof("prepared pod spec: %+v", pod) - - data, err := api.Codec.Encode(&pod) - if err != nil { - log.V(2).Infof("Failed to marshal the pod spec: %v", err) - return err - } - task.Spec.Data = data - return nil -} - type kubeScheduler struct { api schedulerInterface podUpdates queue.FIFO @@ -351,155 +227,6 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { } } -type queuer struct { - lock sync.Mutex // shared by condition variables of this struct - podUpdates queue.FIFO // queue of pod updates to be processed - podQueue *queue.DelayFIFO // queue of pods to be scheduled - deltaCond sync.Cond // pod changes are available for processing - unscheduledCond sync.Cond // there are unscheduled pods for processing -} - -func newQueuer(store queue.FIFO) *queuer { - q := &queuer{ - podQueue: queue.NewDelayFIFO(), - podUpdates: store, - } - q.deltaCond.L = &q.lock - q.unscheduledCond.L = &q.lock - return q -} - -func (q *queuer) installDebugHandlers(mux *http.ServeMux) { - mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) { - for _, x := range q.podQueue.List() { - if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { - break - } - } - }) - mux.HandleFunc("/debug/scheduler/podstore", func(w http.ResponseWriter, r *http.Request) { - for _, x := range q.podUpdates.List() { - if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { - break - } - } - }) -} - -// signal that there are probably pod updates waiting to be processed -func (q *queuer) updatesAvailable() { - q.deltaCond.Broadcast() -} - -// delete a pod from the to-be-scheduled queue -func (q *queuer) dequeue(id string) { - q.podQueue.Delete(id) -} - -// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that -// may have already changed). -func (q *queuer) requeue(pod *Pod) { - // use KeepExisting in case the pod has already been updated (can happen if binding fails - // due to constraint voilations); we don't want to overwrite a newer entry with stale data. - q.podQueue.Add(pod, queue.KeepExisting) - q.unscheduledCond.Broadcast() -} - -// same as requeue but calls podQueue.Offer instead of podQueue.Add -func (q *queuer) reoffer(pod *Pod) { - // use KeepExisting in case the pod has already been updated (can happen if binding fails - // due to constraint voilations); we don't want to overwrite a newer entry with stale data. - if q.podQueue.Offer(pod, queue.KeepExisting) { - q.unscheduledCond.Broadcast() - } -} - -// spawns a go-routine to watch for unscheduled pods and queue them up -// for scheduling. returns immediately. -func (q *queuer) Run(done <-chan struct{}) { - go runtime.Until(func() { - log.Info("Watching for newly created pods") - q.lock.Lock() - defer q.lock.Unlock() - - for { - // limit blocking here for short intervals so that scheduling - // may proceed even if there have been no recent pod changes - p := q.podUpdates.Await(enqueuePopTimeout) - if p == nil { - signalled := runtime.After(q.deltaCond.Wait) - // we've yielded the lock - select { - case <-time.After(enqueueWaitTimeout): - q.deltaCond.Broadcast() // abort Wait() - <-signalled // wait for lock re-acquisition - log.V(4).Infoln("timed out waiting for a pod update") - case <-signalled: - // we've acquired the lock and there may be - // changes for us to process now - } - continue - } - - pod := p.(*Pod) - if recoverAssignedSlave(pod.Pod) != "" { - log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name) - q.dequeue(pod.GetUID()) - } else { - // use ReplaceExisting because we are always pushing the latest state - now := time.Now() - pod.deadline = &now - if q.podQueue.Offer(pod, queue.ReplaceExisting) { - q.unscheduledCond.Broadcast() - log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name) - } else { - log.Warningf("failed to queue pod for scheduling: %v", pod.Pod.Name) - } - } - } - }, 1*time.Second, done) -} - -// implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler -func (q *queuer) yield() *api.Pod { - log.V(2).Info("attempting to yield a pod") - q.lock.Lock() - defer q.lock.Unlock() - - for { - // limit blocking here to short intervals so that we don't block the - // enqueuer Run() routine for very long - kpod := q.podQueue.Await(yieldPopTimeout) - if kpod == nil { - signalled := runtime.After(q.unscheduledCond.Wait) - // lock is yielded at this point and we're going to wait for either - // a timeout, or a signal that there's data - select { - case <-time.After(yieldWaitTimeout): - q.unscheduledCond.Broadcast() // abort Wait() - <-signalled // wait for the go-routine, and the lock - log.V(4).Infoln("timed out waiting for a pod to yield") - case <-signalled: - // we have acquired the lock, and there - // may be a pod for us to pop now - } - continue - } - - pod := kpod.(*Pod).Pod - if podName, err := cache.MetaNamespaceKeyFunc(pod); err != nil { - log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err) - } else if !q.podUpdates.Poll(podName, queue.POP_EVENT) { - log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod) - } else if recoverAssignedSlave(pod) != "" { - // should never happen if enqueuePods is filtering properly - log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod) - } else { - return pod - } - } -} - type errorHandler struct { api schedulerInterface backoff *backoff.Backoff @@ -568,89 +295,6 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) } } -type deleter struct { - api schedulerInterface - qr *queuer -} - -// currently monitors for "pod deleted" events, upon which handle() -// is invoked. -func (k *deleter) Run(updates <-chan queue.Entry, done <-chan struct{}) { - go runtime.Until(func() { - for { - entry := <-updates - pod := entry.Value().(*Pod) - if entry.Is(queue.DELETE_EVENT) { - if err := k.deleteOne(pod); err != nil { - log.Error(err) - } - } else if !entry.Is(queue.POP_EVENT) { - k.qr.updatesAvailable() - } - } - }, 1*time.Second, done) -} - -func (k *deleter) deleteOne(pod *Pod) error { - ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) - podKey, err := podtask.MakePodKey(ctx, pod.Name) - if err != nil { - return err - } - - log.V(2).Infof("pod deleted: %v", podKey) - - // order is important here: we want to make sure we have the lock before - // removing the pod from the scheduling queue. this makes the concurrent - // execution of scheduler-error-handling and delete-handling easier to - // reason about. - k.api.Lock() - defer k.api.Unlock() - - // prevent the scheduler from attempting to pop this; it's also possible that - // it's concurrently being scheduled (somewhere between pod scheduling and - // binding) - if so, then we'll end up removing it from taskRegistry which - // will abort Bind()ing - k.qr.dequeue(pod.GetUID()) - - switch task, state := k.api.tasks().ForPod(podKey); state { - case podtask.StateUnknown: - log.V(2).Infof("Could not resolve pod '%s' to task id", podKey) - return noSuchPodErr - - // determine if the task has already been launched to mesos, if not then - // cleanup is easier (unregister) since there's no state to sync - case podtask.StatePending: - if !task.Has(podtask.Launched) { - // we've been invoked in between Schedule() and Bind() - if task.HasAcceptedOffer() { - task.Offer.Release() - task.Reset() - task.Set(podtask.Deleted) - //TODO(jdef) probably want better handling here - if err := k.api.tasks().Update(task); err != nil { - return err - } - } - k.api.tasks().Unregister(task) - return nil - } - fallthrough - - case podtask.StateRunning: - // signal to watchers that the related pod is going down - task.Set(podtask.Deleted) - if err := k.api.tasks().Update(task); err != nil { - log.Errorf("failed to update task w/ Deleted status: %v", err) - } - return k.api.killTask(task.ID) - - default: - log.Infof("cannot kill pod '%s': non-terminal task not found %v", podKey, task.ID) - return noSuchTaskErr - } -} - // Create creates a scheduler plugin and all supporting background functions. func (k *KubernetesMesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { // use ListWatch watching pods using the client by default diff --git a/contrib/mesos/pkg/scheduler/queuer.go b/contrib/mesos/pkg/scheduler/queuer.go new file mode 100644 index 00000000000..ef2b96f3f86 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/queuer.go @@ -0,0 +1,187 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "io" + "net/http" + "sync" + "time" + + log "github.com/golang/glog" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + "k8s.io/kubernetes/contrib/mesos/pkg/runtime" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" +) + +const ( + enqueuePopTimeout = 200 * time.Millisecond + enqueueWaitTimeout = 1 * time.Second + yieldPopTimeout = 200 * time.Millisecond + yieldWaitTimeout = 1 * time.Second +) + +type queuer struct { + lock sync.Mutex // shared by condition variables of this struct + podUpdates queue.FIFO // queue of pod updates to be processed + podQueue *queue.DelayFIFO // queue of pods to be scheduled + deltaCond sync.Cond // pod changes are available for processing + unscheduledCond sync.Cond // there are unscheduled pods for processing +} + +func newQueuer(store queue.FIFO) *queuer { + q := &queuer{ + podQueue: queue.NewDelayFIFO(), + podUpdates: store, + } + q.deltaCond.L = &q.lock + q.unscheduledCond.L = &q.lock + return q +} + +func (q *queuer) installDebugHandlers(mux *http.ServeMux) { + mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) { + for _, x := range q.podQueue.List() { + if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { + break + } + } + }) + mux.HandleFunc("/debug/scheduler/podstore", func(w http.ResponseWriter, r *http.Request) { + for _, x := range q.podUpdates.List() { + if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { + break + } + } + }) +} + +// signal that there are probably pod updates waiting to be processed +func (q *queuer) updatesAvailable() { + q.deltaCond.Broadcast() +} + +// delete a pod from the to-be-scheduled queue +func (q *queuer) dequeue(id string) { + q.podQueue.Delete(id) +} + +// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that +// may have already changed). +func (q *queuer) requeue(pod *Pod) { + // use KeepExisting in case the pod has already been updated (can happen if binding fails + // due to constraint voilations); we don't want to overwrite a newer entry with stale data. + q.podQueue.Add(pod, queue.KeepExisting) + q.unscheduledCond.Broadcast() +} + +// same as requeue but calls podQueue.Offer instead of podQueue.Add +func (q *queuer) reoffer(pod *Pod) { + // use KeepExisting in case the pod has already been updated (can happen if binding fails + // due to constraint voilations); we don't want to overwrite a newer entry with stale data. + if q.podQueue.Offer(pod, queue.KeepExisting) { + q.unscheduledCond.Broadcast() + } +} + +// spawns a go-routine to watch for unscheduled pods and queue them up +// for scheduling. returns immediately. +func (q *queuer) Run(done <-chan struct{}) { + go runtime.Until(func() { + log.Info("Watching for newly created pods") + q.lock.Lock() + defer q.lock.Unlock() + + for { + // limit blocking here for short intervals so that scheduling + // may proceed even if there have been no recent pod changes + p := q.podUpdates.Await(enqueuePopTimeout) + if p == nil { + signalled := runtime.After(q.deltaCond.Wait) + // we've yielded the lock + select { + case <-time.After(enqueueWaitTimeout): + q.deltaCond.Broadcast() // abort Wait() + <-signalled // wait for lock re-acquisition + log.V(4).Infoln("timed out waiting for a pod update") + case <-signalled: + // we've acquired the lock and there may be + // changes for us to process now + } + continue + } + + pod := p.(*Pod) + if recoverAssignedSlave(pod.Pod) != "" { + log.V(3).Infof("dequeuing assigned pod for scheduling: %v", pod.Pod.Name) + q.dequeue(pod.GetUID()) + } else { + // use ReplaceExisting because we are always pushing the latest state + now := time.Now() + pod.deadline = &now + if q.podQueue.Offer(pod, queue.ReplaceExisting) { + q.unscheduledCond.Broadcast() + log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name) + } else { + log.Warningf("failed to queue pod for scheduling: %v", pod.Pod.Name) + } + } + } + }, 1*time.Second, done) +} + +// implementation of scheduling plugin's NextPod func; see k8s plugin/pkg/scheduler +func (q *queuer) yield() *api.Pod { + log.V(2).Info("attempting to yield a pod") + q.lock.Lock() + defer q.lock.Unlock() + + for { + // limit blocking here to short intervals so that we don't block the + // enqueuer Run() routine for very long + kpod := q.podQueue.Await(yieldPopTimeout) + if kpod == nil { + signalled := runtime.After(q.unscheduledCond.Wait) + // lock is yielded at this point and we're going to wait for either + // a timeout, or a signal that there's data + select { + case <-time.After(yieldWaitTimeout): + q.unscheduledCond.Broadcast() // abort Wait() + <-signalled // wait for the go-routine, and the lock + log.V(4).Infoln("timed out waiting for a pod to yield") + case <-signalled: + // we have acquired the lock, and there + // may be a pod for us to pop now + } + continue + } + + pod := kpod.(*Pod).Pod + if podName, err := cache.MetaNamespaceKeyFunc(pod); err != nil { + log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err) + } else if !q.podUpdates.Poll(podName, queue.POP_EVENT) { + log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod) + } else if recoverAssignedSlave(pod) != "" { + // should never happen if enqueuePods is filtering properly + log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod) + } else { + return pod + } + } +}