diff --git a/contrib/mesos/pkg/scheduler/mesos_scheduler.go b/contrib/mesos/pkg/scheduler/mesos_scheduler.go index c099305d835..73354d0625b 100644 --- a/contrib/mesos/pkg/scheduler/mesos_scheduler.go +++ b/contrib/mesos/pkg/scheduler/mesos_scheduler.go @@ -939,11 +939,8 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se scheduler := &mesosSchedulerApiAdapter{mesosScheduler: k} q := queuer.New(podUpdates) podDeleter := operations.NewDeleter(scheduler, q) - eh := &errorHandler{ - scheduler: scheduler, - backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration), - qr: q, - } + bo := backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration) + eh := operations.NewErrorHandler(scheduler, bo, q) startLatch := make(chan struct{}) eventBroadcaster := record.NewBroadcaster() runtime.On(startLatch, func() { @@ -964,7 +961,7 @@ func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.Se }, Binder: operations.NewBinder(scheduler), NextPod: q.Yield, - Error: eh.handleSchedulingError, + Error: eh.Error, Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), }, scheduler: scheduler, diff --git a/contrib/mesos/pkg/scheduler/operations/errorhandler.go b/contrib/mesos/pkg/scheduler/operations/errorhandler.go new file mode 100644 index 00000000000..b73d5c5773d --- /dev/null +++ b/contrib/mesos/pkg/scheduler/operations/errorhandler.go @@ -0,0 +1,107 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + log "github.com/golang/glog" + mesos "github.com/mesos/mesos-go/mesosproto" + "k8s.io/kubernetes/contrib/mesos/pkg/backoff" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + schedapi "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/api" + merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podschedulers" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util" +) + +type ErrorHandler struct { + scheduler schedapi.Scheduler + backoff *backoff.Backoff + qr *queuer.Queuer +} + +func NewErrorHandler(scheduler schedapi.Scheduler, backoff *backoff.Backoff, qr *queuer.Queuer) *ErrorHandler { + return &ErrorHandler{ + scheduler: scheduler, + backoff: backoff, + qr: qr, + } +} + +// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler +func (k *ErrorHandler) Error(pod *api.Pod, schedulingErr error) { + + if schedulingErr == merrors.NoSuchPodErr { + log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name) + return + } + + log.Infof("Error scheduling %v: %v; retrying", pod.Name, schedulingErr) + defer util.HandleCrash() + + // default upstream scheduler passes pod.Name as binding.PodID + ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) + podKey, err := podtask.MakePodKey(ctx, pod.Name) + if err != nil { + log.Errorf("Failed to construct pod key, aborting scheduling for pod %v: %v", pod.Name, err) + return + } + + k.backoff.GC() + k.scheduler.Lock() + defer k.scheduler.Unlock() + + switch task, state := k.scheduler.Tasks().ForPod(podKey); state { + case podtask.StateUnknown: + // if we don't have a mapping here any more then someone deleted the pod + log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey) + return + + case podtask.StatePending: + if task.Has(podtask.Launched) { + log.V(2).Infof("Skipping re-scheduling for already-launched pod %v", podKey) + return + } + breakoutEarly := queue.BreakChan(nil) + if schedulingErr == podschedulers.NoSuitableOffersErr { + log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) + breakoutEarly = queue.BreakChan(k.scheduler.Offers().Listen(podKey, func(offer *mesos.Offer) bool { + k.scheduler.Lock() + defer k.scheduler.Unlock() + switch task, state := k.scheduler.Tasks().Get(task.ID); state { + case podtask.StatePending: + // Assess fitness of pod with the current offer. The scheduler normally + // "backs off" when it can't find an offer that matches up with a pod. + // The backoff period for a pod can terminate sooner if an offer becomes + // available that matches up. + return !task.Has(podtask.Launched) && k.scheduler.PodScheduler().FitPredicate()(task, offer, nil) + default: + // no point in continuing to check for matching offers + return true + } + })) + } + delay := k.backoff.Get(podKey) + log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay) + k.qr.Requeue(&queuer.Pod{Pod: pod, Delay: &delay, Notify: breakoutEarly}) + + default: + log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey) + } +} diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index f5203be7276..5b8edb68a10 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -24,7 +24,6 @@ import ( log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" - "k8s.io/kubernetes/contrib/mesos/pkg/backoff" "k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" @@ -39,7 +38,6 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/util" plugin "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" ) @@ -209,74 +207,6 @@ func (k *schedulerApiAlgorithmAdapter) doSchedule(task *podtask.T, err error) (s } } -type errorHandler struct { - scheduler schedapi.Scheduler - backoff *backoff.Backoff - qr *queuer.Queuer -} - -// implementation of scheduling plugin's Error func; see plugin/pkg/scheduler -func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) { - - if schedulingErr == merrors.NoSuchPodErr { - log.V(2).Infof("Not rescheduling non-existent pod %v", pod.Name) - return - } - - log.Infof("Error scheduling %v: %v; retrying", pod.Name, schedulingErr) - defer util.HandleCrash() - - // default upstream scheduler passes pod.Name as binding.PodID - ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) - podKey, err := podtask.MakePodKey(ctx, pod.Name) - if err != nil { - log.Errorf("Failed to construct pod key, aborting scheduling for pod %v: %v", pod.Name, err) - return - } - - k.backoff.GC() - k.scheduler.Lock() - defer k.scheduler.Unlock() - - switch task, state := k.scheduler.Tasks().ForPod(podKey); state { - case podtask.StateUnknown: - // if we don't have a mapping here any more then someone deleted the pod - log.V(2).Infof("Could not resolve pod to task, aborting pod reschdule: %s", podKey) - return - - case podtask.StatePending: - if task.Has(podtask.Launched) { - log.V(2).Infof("Skipping re-scheduling for already-launched pod %v", podKey) - return - } - breakoutEarly := queue.BreakChan(nil) - if schedulingErr == podschedulers.NoSuitableOffersErr { - log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) - breakoutEarly = queue.BreakChan(k.scheduler.Offers().Listen(podKey, func(offer *mesos.Offer) bool { - k.scheduler.Lock() - defer k.scheduler.Unlock() - switch task, state := k.scheduler.Tasks().Get(task.ID); state { - case podtask.StatePending: - // Assess fitness of pod with the current offer. The scheduler normally - // "backs off" when it can't find an offer that matches up with a pod. - // The backoff period for a pod can terminate sooner if an offer becomes - // available that matches up. - return !task.Has(podtask.Launched) && k.scheduler.PodScheduler().FitPredicate()(task, offer, nil) - default: - // no point in continuing to check for matching offers - return true - } - })) - } - delay := k.backoff.Get(podKey) - log.V(3).Infof("requeuing pod %v with delay %v", podKey, delay) - k.qr.Requeue(&queuer.Pod{Pod: pod, Delay: &delay, Notify: breakoutEarly}) - - default: - log.V(2).Infof("Task is no longer pending, aborting reschedule for pod %v", podKey) - } -} - type PluginConfig struct { *plugin.Config scheduler schedapi.Scheduler