From a9f80d7383c965ed02ed746f257f0366d9a13e7e Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 25 Oct 2015 15:21:01 -0700 Subject: [PATCH] Move mesosScheduler.NewPluginConfig from plugin.go to mesos_scheduler.go --- .../mesos/pkg/scheduler/mesos_scheduler.go | 63 +++++++++++++++++++ contrib/mesos/pkg/scheduler/plugin.go | 58 ----------------- 2 files changed, 63 insertions(+), 58 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/mesos_scheduler.go b/contrib/mesos/pkg/scheduler/mesos_scheduler.go index 236ad2136b9..ca4f762f08f 100644 --- a/contrib/mesos/pkg/scheduler/mesos_scheduler.go +++ b/contrib/mesos/pkg/scheduler/mesos_scheduler.go @@ -28,22 +28,28 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" bindings "github.com/mesos/mesos-go/scheduler" + "k8s.io/kubernetes/contrib/mesos/pkg/backoff" execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/offers" offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/proc" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/operations" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/slave" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet/container" @@ -51,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/util/sets" + plugin "k8s.io/kubernetes/plugin/pkg/scheduler" ) type PluginInterface interface { @@ -920,3 +927,59 @@ func (ks *MesosScheduler) recoverTasks() error { } return nil } + +// Create creates a scheduler plugin and all supporting background functions. +func (k *MesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { + // use ListWatch watching pods using the client by default + return k.NewPluginConfig(terminate, mux, createAllPodsLW(k.client)) +} + +func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, + podsWatcher *cache.ListWatch) *PluginConfig { + + // Watch and queue pods that need scheduling. + updates := make(chan queue.Entry, k.schedulerConfig.UpdatesBacklog) + podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} + reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0) + + // lock that guards critial sections that involve transferring pods from + // the store (cache) to the scheduling queue; its purpose is to maintain + // an ordering (vs interleaving) of operations that's easier to reason about. + kapi := &mesosSchedulerApiAdapter{mesosScheduler: k} + q := queuer.New(podUpdates) + podDeleter := operations.NewDeleter(kapi, q) + eh := &errorHandler{ + api: kapi, + backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration), + qr: q, + } + startLatch := make(chan struct{}) + eventBroadcaster := record.NewBroadcaster() + runtime.On(startLatch, func() { + eventBroadcaster.StartRecordingToSink(k.client.Events("")) + reflector.Run() // TODO(jdef) should listen for termination + podDeleter.Run(updates, terminate) + q.Run(terminate) + + q.InstallDebugHandlers(mux) + podtask.InstallDebugHandlers(k.taskRegistry, mux) + }) + return &PluginConfig{ + Config: &plugin.Config{ + NodeLister: nil, + Algorithm: &schedulerApiAlgorithmAdapter{ + api: kapi, + podUpdates: podUpdates, + }, + Binder: operations.NewBinder(kapi), + NextPod: q.Yield, + Error: eh.handleSchedulingError, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), + }, + api: kapi, + client: k.client, + qr: q, + deleter: podDeleter, + starting: startLatch, + } +} diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index eec7391b5fb..59464d7c90d 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -18,7 +18,6 @@ package scheduler import ( "fmt" - "net/http" "sync" "time" @@ -38,7 +37,6 @@ import ( "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/util" @@ -270,62 +268,6 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) } } -// Create creates a scheduler plugin and all supporting background functions. -func (k *MesosScheduler) NewDefaultPluginConfig(terminate <-chan struct{}, mux *http.ServeMux) *PluginConfig { - // use ListWatch watching pods using the client by default - return k.NewPluginConfig(terminate, mux, createAllPodsLW(k.client)) -} - -func (k *MesosScheduler) NewPluginConfig(terminate <-chan struct{}, mux *http.ServeMux, - podsWatcher *cache.ListWatch) *PluginConfig { - - // Watch and queue pods that need scheduling. - updates := make(chan queue.Entry, k.schedulerConfig.UpdatesBacklog) - podUpdates := &podStoreAdapter{queue.NewHistorical(updates)} - reflector := cache.NewReflector(podsWatcher, &api.Pod{}, podUpdates, 0) - - // lock that guards critial sections that involve transferring pods from - // the store (cache) to the scheduling queue; its purpose is to maintain - // an ordering (vs interleaving) of operations that's easier to reason about. - kapi := &mesosSchedulerApiAdapter{mesosScheduler: k} - q := queuer.New(podUpdates) - podDeleter := operations.NewDeleter(kapi, q) - eh := &errorHandler{ - api: kapi, - backoff: backoff.New(k.schedulerConfig.InitialPodBackoff.Duration, k.schedulerConfig.MaxPodBackoff.Duration), - qr: q, - } - startLatch := make(chan struct{}) - eventBroadcaster := record.NewBroadcaster() - runtime.On(startLatch, func() { - eventBroadcaster.StartRecordingToSink(k.client.Events("")) - reflector.Run() // TODO(jdef) should listen for termination - podDeleter.Run(updates, terminate) - q.Run(terminate) - - q.InstallDebugHandlers(mux) - podtask.InstallDebugHandlers(k.taskRegistry, mux) - }) - return &PluginConfig{ - Config: &plugin.Config{ - NodeLister: nil, - Algorithm: &schedulerApiAlgorithmAdapter{ - api: kapi, - podUpdates: podUpdates, - }, - Binder: operations.NewBinder(kapi), - NextPod: q.Yield, - Error: eh.handleSchedulingError, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), - }, - api: kapi, - client: k.client, - qr: q, - deleter: podDeleter, - starting: startLatch, - } -} - type PluginConfig struct { *plugin.Config api schedapi.SchedulerApi